Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft PR for os all match changes #822

Open
wants to merge 14 commits into
base: os
Choose a base branch
from
@@ -0,0 +1,13 @@
package zingg.common.core.data.df;

import java.io.Serializable;

import zingg.common.client.ZFrame;

public interface IZFrameEnriched<D, R, C> extends Serializable{

public ZFrame<D,R,C> getOriginalDF();

public ZFrame<D,R,C> getProcessedDF();

}
96 changes: 96 additions & 0 deletions common/core/src/main/java/zingg/common/core/data/df/ZData.java
@@ -0,0 +1,96 @@
package zingg.common.core.data.df;

import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import zingg.common.client.IArguments;
import zingg.common.client.ZFrame;
import zingg.common.client.ZinggClientException;
import zingg.common.client.cols.SelectedCols;
import zingg.common.client.cols.ZidAndFieldDefSelector;
import zingg.common.client.util.ColName;
import zingg.common.core.context.Context;
import zingg.common.core.data.df.controller.BlockedDataController;
import zingg.common.core.data.df.controller.FieldDefDataController;
import zingg.common.core.data.df.controller.PreprocessorDataController;
import zingg.common.core.data.df.controller.RepartitionDataController;
import zingg.common.core.preprocess.IPreProcessor;

public class ZData<S, D, R, C, T> {

protected ZFrame<D,R,C> rawData;
protected IArguments args;
protected Context<S,D,R,C,T> context;
protected List<IPreProcessor<S,D,R,C,T>> preProcessors;

protected ZFrameEnriched<D, R, C> fieldDefFrame;
protected ZFrameEnriched<D, R, C> blockedFrame;
protected ZFrameEnriched<D, R, C> preprocessedFrame;
protected ZFrameEnriched<D, R, C> repartitionFrame;

public static final Log LOG = LogFactory.getLog(ZData.class);

public ZData(ZFrame<D, R, C> rawData, IArguments args, Context<S,D,R,C,T> context,List<IPreProcessor<S,D,R,C,T>> preProcessors) throws ZinggClientException {
this.rawData = rawData;
this.args = args;
this.context = context;
this.preProcessors = preProcessors;
}

public ZFrame<D, R, C> getRawData() {
return rawData;
}

public ZFrameEnriched<D, R, C> getFieldDefFrame() throws ZinggClientException {
if (fieldDefFrame==null) {
ZFrame<D, R, C> originalDF = getRawData();
FieldDefDataController<D, R, C> controller = new FieldDefDataController<D, R, C>(args.getFieldDefinition(),
getColSelector());
this.fieldDefFrame = new ZFrameEnriched<D, R, C>(originalDF, controller.process(originalDF));
}
return fieldDefFrame;
}

public ZFrameEnriched<D, R, C> getPreprocessedFrame() throws ZinggClientException {
if (preprocessedFrame==null) {
ZFrame<D, R, C> originalDF = getFieldDefFrame().getProcessedDF();
PreprocessorDataController<S, D, R, C, T> controller = new PreprocessorDataController<S, D, R, C, T>(
preProcessors);
this.preprocessedFrame = new ZFrameEnriched<D, R, C>(originalDF, controller.process(originalDF));
}
return preprocessedFrame;
}

public ZFrameEnriched<D, R, C> getRepartitionFrame() throws ZinggClientException {
if (repartitionFrame==null) {
ZFrame<D, R, C> originalDF = getPreprocessedFrame().getProcessedDF();
RepartitionDataController<D, R, C> controller = new RepartitionDataController<D, R, C>(
args.getNumPartitions(), ColName.ID_COL);
this.repartitionFrame = new ZFrameEnriched<D, R, C>(originalDF, controller.process(originalDF));
}
return repartitionFrame;
}

public ZFrameEnriched<D, R, C> getBlockedFrame() throws ZinggClientException {
if (blockedFrame==null) {
try {
ZFrame<D, R, C> originalDF = getRepartitionFrame().getProcessedDF();
BlockedDataController<S, D, R, C, T> controller = new BlockedDataController<S, D, R, C, T>(args,
context.getBlockingTreeUtil());
this.blockedFrame = new ZFrameEnriched<D, R, C>(originalDF, controller.process(originalDF));
} catch (ZinggClientException zce) {
throw zce;
} catch (Exception e) {
throw new ZinggClientException(e);
}
}
return blockedFrame;
}

protected SelectedCols getColSelector() {
return new ZidAndFieldDefSelector(args.getFieldDefinition());
}

}
40 changes: 40 additions & 0 deletions common/core/src/main/java/zingg/common/core/data/df/ZDataPair.java
@@ -0,0 +1,40 @@
package zingg.common.core.data.df;

import zingg.common.client.ZFrame;
import zingg.common.core.pairs.IPairBuilder;

public class ZDataPair<S, D, R, C, T> {

protected ZFrame<D,R,C> data1;
protected ZFrame<D,R,C> data2;
protected ZFrame<D,R,C> pairs;
protected IPairBuilder<S, D, R, C> iPairBuilder;

public ZDataPair(ZFrame<D, R, C> data1, ZFrame<D, R, C> data2, IPairBuilder<S, D, R, C> iPairBuilder) {
this.data1 = data1;
this.data2 = data2;
this.iPairBuilder = iPairBuilder;
}

public ZFrame<D, R, C> getData1() {
return data1;
}

public ZFrame<D, R, C> getData2() {
return data2;
}

public ZFrame<D, R, C> getPairs() throws Exception {
if (pairs==null) {
pairs = iPairBuilder.getPairs(data1, data2);
}
return pairs;
}

public IPairBuilder<S, D, R, C> getIPairBuilder() {
return iPairBuilder;
}



}
@@ -0,0 +1,35 @@
package zingg.common.core.data.df;

import zingg.common.client.ZFrame;

public class ZFrameEnriched<D, R, C> implements IZFrameEnriched<D, R, C> {
private static final long serialVersionUID = 1L;

protected ZFrame<D,R,C> originalDF;

protected ZFrame<D,R,C> processedDF;

public ZFrameEnriched(ZFrame<D, R, C> originalDF) {
this.originalDF = originalDF;
}

public ZFrameEnriched(ZFrame<D, R, C> originalDF, ZFrame<D, R, C> processedDF) {
this.originalDF = originalDF;
this.processedDF = processedDF;
}

@Override
public ZFrame<D, R, C> getOriginalDF() {
return originalDF;
}

@Override
public ZFrame<D, R, C> getProcessedDF() {
return processedDF;
}

public void setProcessedDF(ZFrame<D, R, C> processedDF) {
this.processedDF = processedDF;
}

}
@@ -0,0 +1,44 @@
package zingg.common.core.data.df.controller;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import zingg.common.client.IArguments;
import zingg.common.client.ZFrame;
import zingg.common.client.ZinggClientException;
import zingg.common.client.util.ColName;
import zingg.common.core.block.Canopy;
import zingg.common.core.block.Tree;
import zingg.common.core.util.BlockingTreeUtil;

public class BlockedDataController<S, D, R, C, T> implements IDataController<D, R, C> {
protected IArguments args;

protected BlockingTreeUtil<S, D, R, C, T> blockingTreeUtil;

public static final Log LOG = LogFactory.getLog(BlockedDataController.class);

public BlockedDataController(IArguments args, BlockingTreeUtil<S, D, R, C, T> blockingTreeUtil) throws Exception, ZinggClientException {
this.args = args;
this.blockingTreeUtil = blockingTreeUtil;
}

protected ZFrame<D, R, C> getBlocked(ZFrame<D,R,C> originalDF) throws ZinggClientException {
try {
Tree<Canopy<R>> tree = blockingTreeUtil.readBlockingTree(args);
ZFrame<D, R, C> blocked = blockingTreeUtil.getBlockHashes(originalDF, tree);
ZFrame<D, R, C> blocked1 = blocked.repartition(args.getNumPartitions(), blocked.col(ColName.HASH_COL));//.cache();
return blocked1;
} catch (ZinggClientException e) {
throw e;
} catch (Exception e) {
throw new ZinggClientException(e);
}
}

@Override
public ZFrame<D, R, C> process(ZFrame<D,R,C> originalDF) throws ZinggClientException {
return getBlocked(originalDF);
}

}
@@ -0,0 +1,36 @@
package zingg.common.core.data.df.controller;

import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import zingg.common.client.FieldDefinition;
import zingg.common.client.ZFrame;
import zingg.common.client.ZinggClientException;
import zingg.common.client.cols.SelectedCols;
import zingg.common.client.cols.ZidAndFieldDefSelector;

public class FieldDefDataController<D, R, C> implements IDataController<D, R, C> {
@Override
public ZFrame<D, R, C> process(ZFrame<D,R,C> originalDF) throws ZinggClientException {
return originalDF.select(selectedCols.getCols());
}

protected SelectedCols selectedCols;

protected List<? extends FieldDefinition> fieldDefinition;

public static final Log LOG = LogFactory.getLog(FieldDefDataController.class);

public FieldDefDataController(List<? extends FieldDefinition> fieldDefinition) {
this(fieldDefinition,new ZidAndFieldDefSelector(fieldDefinition));
}

public FieldDefDataController(List<? extends FieldDefinition> fieldDefinition,
SelectedCols selectedCols) {
this.fieldDefinition = fieldDefinition;
this.selectedCols = selectedCols;
}

}
@@ -0,0 +1,9 @@
package zingg.common.core.data.df.controller;

import zingg.common.client.ZFrame;
import zingg.common.client.ZinggClientException;

public interface IDataController<D,R,C> {

public ZFrame<D,R,C> process(ZFrame<D,R,C> originalDF) throws ZinggClientException;
}
@@ -0,0 +1,32 @@
package zingg.common.core.data.df.controller;

import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import zingg.common.client.ZFrame;
import zingg.common.client.ZinggClientException;
import zingg.common.core.preprocess.IPreProcessor;

public class PreprocessorDataController<S,D,R,C,T> implements IDataController<D, R, C> {

protected List<IPreProcessor<S,D,R,C,T>> preProcessors;

public static final Log LOG = LogFactory.getLog(PreprocessorDataController.class);

public PreprocessorDataController(List<IPreProcessor<S,D,R,C,T>> preProcessors) throws ZinggClientException {
this.preProcessors = preProcessors;
}

@Override
public ZFrame<D, R, C> process(ZFrame<D,R,C> originalDF) throws ZinggClientException {
ZFrame<D,R,C> processedDF = originalDF;
if (preProcessors != null) {
for (IPreProcessor<S, D, R, C, T> iPreProcessor : preProcessors) {
processedDF = iPreProcessor.preprocess(processedDF);
}
}
return processedDF;
}
}
@@ -0,0 +1,30 @@
package zingg.common.core.data.df.controller;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import zingg.common.client.IArguments;
import zingg.common.client.ZFrame;
import zingg.common.client.ZinggClientException;

public class RepartitionDataController<D, R, C> implements IDataController<D, R, C> {

protected IArguments args;

protected int numPartitions;

protected String partitionCol;

public static final Log LOG = LogFactory.getLog(RepartitionDataController.class);

public RepartitionDataController(int numPartitions, String partitionCol) {
this.numPartitions = numPartitions;
this.partitionCol = partitionCol;
}

@Override
public ZFrame<D, R, C> process(ZFrame<D,R,C> originalDF) throws ZinggClientException {
return originalDF.repartition(numPartitions,originalDF.col(partitionCol));
}

}
12 changes: 7 additions & 5 deletions common/core/src/main/java/zingg/common/core/executor/Linker.java
Expand Up @@ -7,6 +7,8 @@
import zingg.common.client.ZinggClientException;
import zingg.common.client.options.ZinggOptions;
import zingg.common.client.util.ColName;
import zingg.common.core.data.df.ZData;
import zingg.common.core.data.df.ZDataPair;
import zingg.common.core.filter.PredictionFilter;
import zingg.common.core.pairs.SelfPairBuilderSourceSensitive;

Expand All @@ -28,19 +30,19 @@ public ZFrame<D,R,C> selectColsFromBlocked(ZFrame<D,R,C> blocked) {
}

@Override
public ZFrame<D,R,C> getPairs(ZFrame<D,R,C>blocked, ZFrame<D,R,C>bAll) throws Exception{
public ZDataPair<S, D, R, C, T> getPairs(ZFrame<D,R,C>blocked, ZFrame<D,R,C>bAll) throws Exception{
return getPairs(blocked, bAll, new SelfPairBuilderSourceSensitive<S, D, R, C> (getDSUtil(),args));
}

@Override
protected ZFrame<D,R,C> getActualDupes(ZFrame<D,R,C> blocked, ZFrame<D,R,C> testData) throws Exception, ZinggClientException{
protected ZFrame<D,R,C> getActualDupes(ZData<S,D,R,C,T> rawData) throws Exception, ZinggClientException{
PredictionFilter<D, R, C> predictionFilter = new PredictionFilter<D, R, C>();
SelfPairBuilderSourceSensitive<S, D, R, C> iPairBuilder = new SelfPairBuilderSourceSensitive<S, D, R, C> (getDSUtil(),args);
return getActualDupes(blocked, testData,predictionFilter, iPairBuilder, null);
return getActualDupes(rawData,predictionFilter, iPairBuilder, null);
}

@Override
public void writeOutput(ZFrame<D,R,C> sampleOrginal, ZFrame<D,R,C> dupes) throws ZinggClientException {
public void writeOutput( ZData<S,D,R,C,T> rawData, ZFrame<D,R,C> dupes) throws ZinggClientException {
try {
// input dupes are pairs
/// pick ones according to the threshold by user
Expand All @@ -55,7 +57,7 @@ public void writeOutput(ZFrame<D,R,C> sampleOrginal, ZFrame<D,R,C> dupes) throws
dupesActual = dupesActual.withColumn(ColName.CLUSTER_COLUMN, dupesActual.col(ColName.ID_COL));
dupesActual = getDSUtil().addUniqueCol(dupesActual, ColName.CLUSTER_COLUMN);
ZFrame<D,R,C>dupes2 = getDSUtil().alignLinked(dupesActual, args);
dupes2 = getDSUtil().postprocessLinked(dupes2, sampleOrginal);
dupes2 = getDSUtil().postprocessLinked(dupes2, rawData.getFieldDefFrame().getProcessedDF());
LOG.debug("uncertain output schema is " + dupes2.showSchema());
getPipeUtil().write(dupes2, args.getOutput());
}
Expand Down