/
Canopy.java
315 lines (278 loc) · 8.02 KB
/
Canopy.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
package zingg.common.core.block;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import zingg.common.client.FieldDefinition;
import zingg.common.client.util.ColName;
import zingg.common.client.util.ListMap;
import zingg.common.core.hash.HashFunction;
public class Canopy<R> implements Serializable {
public static final Log LOG = LogFactory.getLog(Canopy.class);
// created by function edge leading from parent to this node
protected HashFunction function;
// aplied on field
protected FieldDefinition context;
// list of duplicates passed from parent
protected List<R> dupeN;
// number of duplicates eliminated after function applied on fn context
protected long elimCount;
// hash of canopy
protected Object hash;
// training set
protected List<R> training;
// duplicates remaining after function is applied
protected List<R> dupeRemaining;
public Canopy() {
}
public Canopy(List<R> training, List<R> dupeN) {
this.training = training; //.cache();
this.dupeN = dupeN;
}
public Canopy(List<R> training, List<R> dupeN, HashFunction function,
FieldDefinition context) {
this(training, dupeN);
this.function = function;
this.context = context;
// prepare();
}
/**
* @return the function
*/
public HashFunction getFunction() {
return function;
}
/**
* @param function
* the function to set
*/
public void setFunction(HashFunction function) {
this.function = function;
}
/**
* @return the context
*/
public FieldDefinition getContext() {
return context;
}
/**
* @param context
* the context to set
*/
public void setContext(FieldDefinition context) {
this.context = context;
}
/**
* @return the dupeN
*/
public List<R> getDupeN() {
return dupeN;
}
/**
* @param dupeN
* the dupeN to set
*/
public void setDupeN(List<R> dupeN) {
this.dupeN = dupeN;
}
/**
* @return the elimCount
*/
public long getElimCount() {
return elimCount;
}
/**
* @param elimCount
* the elimCount to set
*/
public void setElimCount(long elimCount) {
this.elimCount = elimCount;
}
/**
* @return the hash
*/
public Object getHash() {
return hash;
}
/**
* @param hash
* the hash to set
*/
public void setHash(Object hash) {
this.hash = hash;
}
/**
* @return the training
*/
public List<R> getTraining() {
return training;
}
/**
* @param training
* the training to set
*/
public void setTraining(List<R> training) {
this.training = training;
}
public List<Canopy<R>> getCanopies() {
//long ts = System.currentTimeMillis();
/*
List<R> newTraining = function.apply(training, context.fieldName, ColName.HASH_COL).cache();
LOG.debug("getCanopies0" + (System.currentTimeMillis() - ts));
List<Canopy> returnCanopies = new ArrayList<Canopy>();
//first find unique hashes
//then split the training into per hash
List<R> uniqueHashes = newTraining.select(ColName.HASH_COL).distinct().collectAsList();
LOG.debug("getCanopies1" + (System.currentTimeMillis() - ts));
for (Row row : uniqueHashes) {
Object key = row.get(0);
List<R> tupleList = newTraining.filter(newTraining.col(ColName.HASH_COL).equalTo(key))
.cache();
tupleList = tupleList.drop(ColName.HASH_COL);
Canopy can = new Canopy(tupleList, dupeRemaining);
//LOG.debug(" canopy size is " + tupleList.count() + " for hash "
// + key);
can.hash = key;
returnCanopies.add(can);
}
LOG.debug("getCanopies2" + (System.currentTimeMillis() - ts));
return returnCanopies;*/
ListMap<Object, R> hashes = new ListMap<Object, R>();
List<Canopy<R>> returnCanopies = new ArrayList<Canopy<R>>();
for (R r : training) {
hashes.add(function.apply(r, context.fieldName), r);
}
for (Object o: hashes.keySet()) {
Canopy<R> can = new Canopy<R>(hashes.get(o), dupeRemaining);
can.hash = o;
returnCanopies.add(can);
}
hashes = null;
//LOG.debug("getCanopies2" + (System.currentTimeMillis() - ts));
return returnCanopies;
}
public long estimateCanopies() {
//long ts = System.currentTimeMillis();
Set<Object> hashes = new HashSet<Object>();
for (R r : training) {
hashes.add(function.apply(r, context.fieldName));
}
/*
List<R> newTraining = function.apply(training, context.fieldName, ColName.HASH_COL);
long uniqueHashes = (long) newTraining.select(functions.approxCountDistinct(
newTraining.col(ColName.HASH_COL))).takeAsList(1).get(0).get(0);
LOG.debug("estimateCanopies" + (System.currentTimeMillis() - ts) + " and count is " + uniqueHashes);
/*newTraining.agg(
functions.approxCountDistinct(newTraining.col(ColName.HASH_COL))).show();
long ts1 = System.currentTimeMillis();
long uniqueHashes = newTraining.select(newTraining.col(ColName.HASH_COL)).distinct().count(); //.distinct().count();
LOG.warn("estimateCanopies" + (System.currentTimeMillis() - ts1) + " and count is " + uniqueHashes);
*/
long uniqueHashes = hashes.size();
LOG.debug("estimateCanopies- unique hash count is " + uniqueHashes);
return uniqueHashes;
}
public long getTrainingSize() {
return training.size();
}
/*
* (non-Javadoc)
*
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
String str = "";
if (context != null) {
str = "Canopy [function=" + function + ", context=" + context.fieldName
+ ", elimCount=" + elimCount + ", hash=" + hash;
}
else {
str = "Canopy [function=" + function + ", context=" + context
+ ", elimCount=" + elimCount + ", hash=" + hash;
}
if (training != null) {
str += ", training=" + training.size();
}
str += "]";
return str;
}
public void estimateElimCount() {
//long ts = System.currentTimeMillis();
//the function is applied to both columns
//if hash is equal, they are not going to be eliminated
//filter on hash equal and count
LOG.debug("Applying " + function.getName());
dupeRemaining = new ArrayList<R>();
for(R r: dupeN) {
Object hash1 = function.apply(r, context.fieldName);
Object hash2 = function.apply(r, ColName.COL_PREFIX + context.fieldName);
LOG.debug("hash1 " + hash1);
LOG.debug("hash2 " + hash2);
if (hash1 == null && hash2 ==null) {
dupeRemaining.add(r);
}
else if (hash1 != null && hash2 != null && hash1.equals(hash2)) {
dupeRemaining.add(r);
LOG.debug("NOT eliminatin " );
}
else {
LOG.debug("eliminatin " + r);
}
}
elimCount = dupeN.size() - dupeRemaining.size();
//LOG.debug("estimateElimCount" + (System.currentTimeMillis() - ts));
}
/*public ListMap<Object, Row> getHashForDupes(List<R> d) {
//dupeRemaining = new ArrayList<R>();
ListMap<Object, Row> returnMap = new ListMap<Object, Row>();
for (Row pair : d) {
Tuple first = pair.getFirst();
Tuple second = pair.getSecond();
Object hash1 = apply(first);
Object hash2 = apply(second);
if (hash1 == null) {
if (hash2 != null) {
//do nothing
}
} else {
if (!hash1.equals(hash2)) {
// LOG.info("Elimniation of " + pair);
//do nothing
} else {
returnMap.add(hash1, pair);
}
}
}
return returnMap;
}*/
public Canopy copyTo(Canopy copyTo) {
copyTo.function = function;
copyTo.context = context;
// list of duplicates passed from parent
copyTo.dupeN = dupeN;
// number of duplicates eliminated after function applied on fn context
copyTo.elimCount = elimCount;
// hash of canopy
copyTo.hash = hash;
// training set
copyTo.training = training;
// duplicates remaining after function is applied
copyTo.dupeRemaining = dupeRemaining;
return copyTo;
}
/**
* We will call this canopy's clear function to remove dupes, training and
* remaining data before we persist to disk this method is to be called just
* before
*/
public void clearBeforeSaving() {
this.training = null;
// this.elimCount = null;
this.dupeN = null;
this.dupeRemaining = null;
}
}