New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
using data selector as wrapper over raw data #815
Changes from 2 commits
704c503
d6acbef
c14132c
6b53d07
02b3e46
5c5e845
d344cf1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
package zingg.common.core.data.df; | ||
|
||
import org.apache.commons.logging.Log; | ||
import org.apache.commons.logging.LogFactory; | ||
|
||
import zingg.common.client.IArguments; | ||
import zingg.common.client.ZFrame; | ||
import zingg.common.client.ZinggClientException; | ||
import zingg.common.client.util.ColName; | ||
import zingg.common.core.block.Canopy; | ||
import zingg.common.core.block.Tree; | ||
import zingg.common.core.context.Context; | ||
|
||
public class BlockedFrame<S, D, R, C, T> implements IZFrameProcessor<S, D, R, C, T> { | ||
|
||
protected ZFrame<D,R,C> originalDF; | ||
|
||
protected ZFrame<D,R,C> processedDF; | ||
|
||
protected IArguments args; | ||
|
||
protected Context<S,D,R,C,T> context; | ||
|
||
public static final Log LOG = LogFactory.getLog(BlockedFrame.class); | ||
|
||
public BlockedFrame(ZFrame<D, R, C> originalDF, IArguments args, Context<S,D,R,C,T> context) throws Exception, ZinggClientException { | ||
super(); | ||
this.originalDF = originalDF; | ||
this.args = args; | ||
this.context = context; | ||
this.processedDF = getBlocked(); | ||
} | ||
|
||
@Override | ||
public ZFrame<D, R, C> getOriginalDF() { | ||
return originalDF; | ||
} | ||
|
||
@Override | ||
public ZFrame<D, R, C> getProcessedDF() { | ||
return processedDF; | ||
} | ||
|
||
protected ZFrame<D, R, C> getBlocked() throws Exception, ZinggClientException { | ||
//testData = dropDuplicates(testData); | ||
Tree<Canopy<R>> tree = context.getBlockingTreeUtil().readBlockingTree(args); | ||
ZFrame<D, R, C> blocked = context.getBlockingTreeUtil().getBlockHashes(getOriginalDF(), tree); | ||
ZFrame<D, R, C> blocked1 = blocked.repartition(args.getNumPartitions(), blocked.col(ColName.HASH_COL));//.cache(); | ||
return blocked1; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
package zingg.common.core.data.df; | ||
|
||
import org.apache.commons.logging.Log; | ||
import org.apache.commons.logging.LogFactory; | ||
|
||
import zingg.common.client.IArguments; | ||
import zingg.common.client.ZFrame; | ||
import zingg.common.client.cols.ZidAndFieldDefSelector; | ||
|
||
public class FieldDefFrame<S, D, R, C, T> implements IZFrameProcessor<S, D, R, C, T> { | ||
|
||
protected ZFrame<D,R,C> originalDF; | ||
|
||
protected ZFrame<D,R,C> processedDF; | ||
|
||
protected IArguments args; | ||
|
||
public static final Log LOG = LogFactory.getLog(FieldDefFrame.class); | ||
|
||
public FieldDefFrame(ZFrame<D, R, C> originalDF, IArguments args) { | ||
super(); | ||
this.originalDF = originalDF; | ||
this.args = args; | ||
this.processedDF = getOriginalDF().select(new ZidAndFieldDefSelector(args.getFieldDefinition()).getCols()); | ||
// return getDSUtil().getFieldDefColumnsDS(testDataOriginal, args, true); | ||
} | ||
|
||
@Override | ||
public ZFrame<D, R, C> getOriginalDF() { | ||
return originalDF; | ||
} | ||
|
||
@Override | ||
public ZFrame<D, R, C> getProcessedDF() { | ||
return processedDF; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
package zingg.common.core.data.df; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it may be better to introduce a method process() so that the logic is not in the constructor. |
||
|
||
import zingg.common.client.ZFrame; | ||
|
||
public interface IZFrameProcessor<S, D, R, C, T> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does it need S and T? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BlockingTreeUtil<S, D,R,C,T> There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. StopWordsRemover<S,D,R,C,T> There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so we should have one frame<D,R,C> and then another that extends this and adds S and T. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what benefit it would give? ZData in any case has <S,D,R,C,T> There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unnecessarily two very similar interfaces to be maintained whose objects can't be assigned to one another |
||
|
||
public ZFrame<D,R,C> getOriginalDF(); | ||
|
||
public ZFrame<D,R,C> getProcessedDF(); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
package zingg.common.core.data.df; | ||
|
||
import org.apache.commons.logging.Log; | ||
import org.apache.commons.logging.LogFactory; | ||
|
||
import zingg.common.client.IArguments; | ||
import zingg.common.client.ZFrame; | ||
import zingg.common.client.ZinggClientException; | ||
import zingg.common.core.preprocess.StopWordsRemover; | ||
|
||
public class PreprocessedFrame<S, D, R, C, T> implements IZFrameProcessor<S, D, R, C, T> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. change stopword design - have a processor there. so preprocessedframe can be passed any processor(stopwords, tomorrow lowercase etc) |
||
|
||
protected ZFrame<D,R,C> originalDF; | ||
|
||
protected ZFrame<D,R,C> processedDF; | ||
|
||
protected IArguments args; | ||
|
||
protected StopWordsRemover<S, D, R, C, T> stopWordsRemover; | ||
|
||
public static final Log LOG = LogFactory.getLog(PreprocessedFrame.class); | ||
|
||
public PreprocessedFrame(ZFrame<D, R, C> originalDF, IArguments args, StopWordsRemover<S, D, R, C, T> stopWordsRemover) throws ZinggClientException { | ||
super(); | ||
this.originalDF = originalDF; | ||
this.args = args; | ||
this.stopWordsRemover = stopWordsRemover; | ||
this.processedDF = preprocess(); | ||
} | ||
|
||
protected ZFrame<D, R, C> preprocess() throws ZinggClientException { | ||
return this.stopWordsRemover.preprocessForStopWords(getOriginalDF()); | ||
} | ||
|
||
@Override | ||
public ZFrame<D, R, C> getOriginalDF() { | ||
return originalDF; | ||
} | ||
|
||
@Override | ||
public ZFrame<D, R, C> getProcessedDF() { | ||
return processedDF; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
package zingg.common.core.data.df; | ||
|
||
import org.apache.commons.logging.Log; | ||
import org.apache.commons.logging.LogFactory; | ||
|
||
import zingg.common.client.IArguments; | ||
import zingg.common.client.ZFrame; | ||
import zingg.common.client.util.ColName; | ||
|
||
public class RepartitionFrame<S, D, R, C, T> implements IZFrameProcessor<S, D, R, C, T> { | ||
|
||
protected ZFrame<D,R,C> originalDF; | ||
|
||
protected ZFrame<D,R,C> processedDF; | ||
|
||
protected IArguments args; | ||
|
||
public static final Log LOG = LogFactory.getLog(RepartitionFrame.class); | ||
|
||
public RepartitionFrame(ZFrame<D, R, C> originalDF, IArguments args) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should pass the numberofPartitions and column on which we need to partition |
||
super(); | ||
this.originalDF = originalDF; | ||
this.args = args; | ||
this.processedDF = getOriginalDF().repartition(args.getNumPartitions(),getOriginalDF().col(ColName.ID_COL)); | ||
} | ||
|
||
@Override | ||
public ZFrame<D, R, C> getOriginalDF() { | ||
return originalDF; | ||
} | ||
|
||
@Override | ||
public ZFrame<D, R, C> getProcessedDF() { | ||
return processedDF; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
package zingg.common.core.data.df; | ||
|
||
import org.apache.commons.logging.Log; | ||
import org.apache.commons.logging.LogFactory; | ||
|
||
import zingg.common.client.IArguments; | ||
import zingg.common.client.ZFrame; | ||
import zingg.common.client.ZinggClientException; | ||
import zingg.common.core.context.Context; | ||
import zingg.common.core.preprocess.StopWordsRemover; | ||
|
||
public class ZData<S, D, R, C, T> { | ||
|
||
protected ZFrame<D,R,C> rawData; | ||
protected IArguments args; | ||
protected Context<S,D,R,C,T> context; | ||
protected StopWordsRemover<S, D, R, C, T> stopWordsRemover; | ||
|
||
protected FieldDefFrame<S, D, R, C, T> fieldDefFrame; | ||
protected BlockedFrame<S, D, R, C, T> blockedData; | ||
protected PreprocessedFrame<S, D, R, C, T> preprocessedFrame; | ||
protected RepartitionFrame<S, D, R, C, T> repartitionFrame; | ||
|
||
public static final Log LOG = LogFactory.getLog(ZData.class); | ||
|
||
public ZData(ZFrame<D, R, C> rawData, IArguments args, Context<S,D,R,C,T> context,StopWordsRemover<S, D, R, C, T> stopWordsRemover) throws ZinggClientException { | ||
try { | ||
this.rawData = rawData; | ||
this.args = args; | ||
this.context = context; | ||
this.stopWordsRemover = stopWordsRemover; | ||
this.fieldDefFrame = new FieldDefFrame<S, D, R, C, T>(getRawData(),args); | ||
this.preprocessedFrame = new PreprocessedFrame<S, D, R, C, T>(getFieldDefFrame().getProcessedDF(),args,stopWordsRemover); | ||
this.repartitionFrame = new RepartitionFrame<S, D, R, C, T>(getPreprocessedFrame().getProcessedDF(),args); | ||
this.blockedData = new BlockedFrame<S, D, R, C, T>(getRepartitionFrame().getProcessedDF(), args, context); | ||
} catch (ZinggClientException e) { | ||
throw e; | ||
} catch (Exception e) { | ||
throw new ZinggClientException(e); | ||
} | ||
} | ||
|
||
public ZFrame<D, R, C> getRawData() { | ||
return rawData; | ||
} | ||
|
||
public FieldDefFrame<S, D, R, C, T> getFieldDefFrame() { | ||
return fieldDefFrame; | ||
} | ||
|
||
public PreprocessedFrame<S, D, R, C, T> getPreprocessedFrame() { | ||
return preprocessedFrame; | ||
} | ||
|
||
public RepartitionFrame<S, D, R, C, T> getRepartitionFrame() { | ||
return repartitionFrame; | ||
} | ||
|
||
public BlockedFrame<S, D, R, C, T> getBlockedFrame() throws Exception, ZinggClientException{ | ||
return blockedData; | ||
} | ||
|
||
} |
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to have an overloaded constructor here which can take the fielddefselectors. we have a few so may need to pass those in the future