Skip to content

Commit

Permalink
Merge pull request #562 from zinggAI/0.3.5
Browse files Browse the repository at this point in the history
issue #558 linker not working
  • Loading branch information
sonalgoyal committed May 17, 2023
2 parents 351591c + 168d744 commit 8a67b0d
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 16 deletions.
Expand Up @@ -47,6 +47,7 @@ public interface ZFrame<D, R, C> {
public ZFrame<D, R, C> dropDuplicates(String[] c);

public ZFrame<D, R, C> drop(String c);
public ZFrame<D, R, C> drop(C c);
public ZFrame<D, R, C> drop(String... c);
public ZFrame<D, R, C> except(ZFrame<D, R, C> c);

Expand Down
20 changes: 7 additions & 13 deletions common/core/src/main/java/zingg/common/core/util/DSUtil.java
Expand Up @@ -103,27 +103,27 @@ public ZFrame<D, R, C> joinOnNamedColAndDropIt(ZFrame<D, R, C> lines, ZFrame<D,

public ZFrame<D, R, C> joinWithItself(ZFrame<D, R, C> lines, String joinColumn, boolean filter) throws Exception {
ZFrame<D, R, C> lines1 = getPrefixedColumnsDS(lines);
System.out.println("prefixed");
lines1.show(true);
return join(lines, lines1, joinColumn, filter);
}

public ZFrame<D, R, C> joinWithItselfSourceSensitive(ZFrame<D, R, C> lines, String joinColumn, Arguments args) throws Exception {

ZFrame<D, R, C> lines1 = getPrefixedColumnsDS(lines).cache();

String[] sourceNames = args.getPipeNames();
lines = lines.filter(lines.equalTo(joinColumn, sourceNames[0]));
lines = lines.filter(lines.equalTo(ColName.SOURCE_COL, sourceNames[0]));
lines1 = lines1.filter(lines1.notEqual(ColName.COL_PREFIX + ColName.SOURCE_COL, sourceNames[0]));
return join(lines, lines1, joinColumn, false);
}

public ZFrame<D, R, C> alignLinked(ZFrame<D, R, C> dupesActual, Arguments args) {
dupesActual = dupesActual.cache();
dupesActual = dupesActual.withColumnRenamed(ColName.ID_COL, ColName.CLUSTER_COLUMN);
List<C> cols = new ArrayList<C>();
cols.add(dupesActual.col(ColName.CLUSTER_COLUMN));
cols.add(dupesActual.col(ColName.ID_COL));
cols.add(dupesActual.col(ColName.SCORE_COL));

for (FieldDefinition def: args.getFieldDefinition()) {
cols.add(dupesActual.col(def.fieldName));
}
Expand Down Expand Up @@ -277,10 +277,6 @@ public List<FieldDefinition> getFieldDefinitionFiltered(Arguments args, MatchTy
}

public ZFrame<D,R,C> postprocess(ZFrame<D,R,C> actual, ZFrame<D,R,C> orig) {
System.out.println("postproc actual");
actual.show(true);
System.out.println("postproc orig");
orig.show(true);
List<C> cols = new ArrayList<C>();
cols.add(actual.col(ColName.CLUSTER_COLUMN));
cols.add(actual.col(ColName.ID_COL));
Expand All @@ -289,8 +285,6 @@ public ZFrame<D,R,C> postprocess(ZFrame<D,R,C> actual, ZFrame<D,R,C> orig) {
cols.add(actual.col(ColName.MATCH_FLAG_COL));

ZFrame<D,R,C> zFieldsFromActual = actual.select(cols);
System.out.println("postproc selected");
zFieldsFromActual.show(true);
ZFrame<D,R,C> joined = zFieldsFromActual.joinOnCol(orig, ColName.ID_COL);

return joined;
Expand All @@ -305,9 +299,9 @@ public ZFrame<D,R,C> postprocessLinked(ZFrame<D,R,C> actual, ZFrame<D,R,C> orig)

ZFrame<D,R,C> zFieldsFromActual = actual.select(cols);
ZFrame<D,R,C> joined = zFieldsFromActual.join(orig,ColName.ID_COL,ColName.SOURCE_COL)
.drop(ColName.SOURCE_COL)
.drop(zFieldsFromActual.col(ColName.SOURCE_COL))
.drop(ColName.ID_COL);

return joined;
}

Expand Down
4 changes: 2 additions & 2 deletions examples/febrl/configLink.json
Expand Up @@ -14,7 +14,7 @@
},
{
"fieldName" : "stNo",
"matchType": "exact",
"matchType": "fuzzy",
"fields" : "stNo",
"dataType": "string"
},
Expand All @@ -38,7 +38,7 @@
},
{
"fieldName" : "areacode",
"matchType": "exact",
"matchType": "fuzzy",
"fields" : "areacode",
"dataType": "string"
},
Expand Down
Expand Up @@ -130,7 +130,12 @@ public ZFrame<Dataset<Row>, Row, Column> dropDuplicates(String c, String... d) {
public ZFrame<Dataset<Row>, Row, Column> drop(String c) {
return new SparkFrame(df.drop(c));
}


@Override
public ZFrame<Dataset<Row>, Row, Column> drop(Column c) {
return new SparkFrame(df.drop(c));
}

public ZFrame<Dataset<Row>, Row, Column> except(ZFrame<Dataset<Row>, Row, Column> c) {
return new SparkFrame(df.except(c.df()));
}
Expand Down

0 comments on commit 8a67b0d

Please sign in to comment.