Skip to content

Commit

Permalink
Merge pull request #596 from zinggAI/issue595
Browse files Browse the repository at this point in the history
Issue #595 code refactor
  • Loading branch information
sonalgoyal committed May 25, 2023
2 parents 8a67b0d + 736d414 commit 118cb07
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 21 deletions.
@@ -1,7 +1,5 @@
package zingg.common.client;

import java.util.Arrays;

import zingg.common.client.util.Util;

public enum ZinggOptions {
Expand All @@ -18,7 +16,8 @@ public enum ZinggOptions {
FIND_AND_LABEL("findAndLabel"),
ASSESS_MODEL("assessModel"),
PEEK_MODEL("peekModel"),
EXPORT_MODEL("exportModel");
EXPORT_MODEL("exportModel"),
RESOLVE("resolve");

private String value;

Expand Down
37 changes: 19 additions & 18 deletions common/core/src/main/java/zingg/common/core/util/PipeUtil.java
@@ -1,29 +1,18 @@
package zingg.common.core.util;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import zingg.common.client.Arguments;
import zingg.common.client.ZinggClientException;
import zingg.common.client.ZFrame;
import zingg.common.client.util.ColName;
import zingg.common.client.ZinggClientException;
import zingg.common.client.pipe.FilePipe;
//import zingg.common.client.pipe.InMemoryPipe;
import zingg.common.client.pipe.Pipe;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;

import zingg.common.core.util.PipeUtilBase;
import zingg.common.client.util.ColName;

//import com.datastax.spark.connector.cql.*;
//import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;
Expand Down Expand Up @@ -134,14 +123,23 @@ public ZFrame<D,R,C> joinTrainingSetstoGetLabels(ZFrame<D,R,C> jdbc,

public ZFrame<D,R,C> readInternal(boolean addLineNo,
boolean addSource, Pipe<D,R,C>... pipes) throws ZinggClientException {
return readInternal(false, addLineNo,addSource, pipes);
}

public ZFrame<D,R,C> readInternal(boolean addExtraCol, boolean addLineNo,
boolean addSource, Pipe<D,R,C>... pipes) throws ZinggClientException {
ZFrame<D,R,C> input = null;

for (Pipe p : pipes) {
if (input == null) {
input = readInternal(p, addSource);
LOG.debug("input size is " + input.count());
} else {
if(!addExtraCol) {
input = input.union(readInternal(p, addSource));
} else {
input = input.unionByName(readInternal(p, addSource),true);
}
}
}
// we will probably need to create row number as string with pipename/id as
Expand All @@ -150,9 +148,7 @@ public ZFrame<D,R,C> readInternal(boolean addLineNo,
input = addLineNo(input); //new SparkFrame(new SparkDSUtil(getSession()).addRowNumber(input).df());
// we need to transform the input here by using stop words
return input;
}


}

public ZFrame<D,R,C> read(boolean addLineNo, boolean addSource, Pipe<D,R,C>... pipes) throws ZinggClientException {
ZFrame<D,R,C> rows = readInternal(addLineNo, addSource, pipes);
Expand All @@ -178,12 +174,17 @@ public ZFrame<D,R,C> sample(S spark, Pipe p) throws ZinggClientException {

public ZFrame<D,R,C> read(boolean addLineNo, int numPartitions,
boolean addSource, Pipe<D,R,C>... pipes) throws ZinggClientException {
ZFrame<D,R,C> rows = readInternal(addLineNo, addSource, pipes);
return read(false, addLineNo, numPartitions, addSource, pipes);
}

public ZFrame<D,R,C> read(boolean addExtraCol, boolean addLineNo, int numPartitions,
boolean addSource, Pipe<D,R,C>... pipes) throws ZinggClientException {
ZFrame<D,R,C> rows = readInternal(addExtraCol, addLineNo, addSource, pipes);
rows = rows.repartition(numPartitions);
rows = rows.cache();
return rows;
}

public void write(ZFrame<D,R,C> toWriteOrig, Arguments args,
Pipe<D,R,C>... pipes) throws ZinggClientException {
try {
Expand Down
Expand Up @@ -14,6 +14,9 @@ public interface PipeUtilBase<S, D, R, C> {

public ZFrame<D, R, C> readInternal(boolean addLineNo,
boolean addSource, Pipe<D, R, C>... pipes) throws ZinggClientException;

public ZFrame<D,R,C> readInternal(boolean addExtraCol, boolean addLineNo,
boolean addSource, Pipe<D,R,C>... pipes) throws ZinggClientException;

public ZFrame<D, R, C> read(boolean addLineNo, boolean addSource, Pipe<D, R, C>... pipes)
throws ZinggClientException;
Expand All @@ -23,6 +26,8 @@ public ZFrame<D, R, C> read(boolean addLineNo, int numPartitions,
boolean addSource, Pipe<D, R, C>... pipes)
throws ZinggClientException;

public ZFrame<D,R,C> read(boolean addExtraCol, boolean addLineNo, int numPartitions,
boolean addSource, Pipe<D,R,C>... pipes) throws ZinggClientException;

public void write(ZFrame<D, R, C> toWriteOrig, Arguments args, Pipe<D, R, C>... pipes)
throws ZinggClientException;
Expand Down

0 comments on commit 118cb07

Please sign in to comment.