Skip to content

Commit

Permalink
Fixed bug where unique results of null values failed to return (#2354)
Browse files Browse the repository at this point in the history
* The intermedite results mechanism which got fixed with the
  most recent unique mechanism resulted in this bug
* Added tests to ensure FinalDoc and intermediate results are passed
directly through
* Fix javadoc
  • Loading branch information
ivakegg committed May 10, 2024
1 parent 0046319 commit 15f08a3
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 92 deletions.
Expand Up @@ -4,8 +4,10 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;

import org.apache.commons.lang.StringUtils;

Expand All @@ -28,7 +30,7 @@
*/
public class UniqueFields implements Serializable, Cloneable {

private final SortedSetMultimap<String,UniqueGranularity> fieldMap = TreeMultimap.create();
private final TreeMultimap<String,UniqueGranularity> fieldMap = TreeMultimap.create();
private boolean mostRecent = false;

/**
Expand Down Expand Up @@ -213,21 +215,21 @@ public void replace(String field, String replacement) {
}

/**
* Return a copy of the fields within this {@link UniqueFields}. Modifications to this set will not modify the fields in this {@link UniqueFields}.
* Return the fields within this {@link UniqueFields}. Modifications to this set will modify the fields in this {@link UniqueFields}.
*
* @return a copy of the fields
*/
public Set<String> getFields() {
return Sets.newHashSet(fieldMap.keySet());
public NavigableSet<String> getFields() {
return fieldMap.keySet();
}

/**
* Return the underlying field-granularity map from this {@link UniqueFields}.
*
* @return the field map
*/
public Multimap<String,UniqueGranularity> getFieldMap() {
return TreeMultimap.create(fieldMap);
public TreeMultimap<String,UniqueGranularity> getFieldMap() {
return fieldMap;
}

/**
Expand Down
Expand Up @@ -644,7 +644,7 @@ private void addConfigBasedTransformers() throws QueryException {
if (getConfig().getUniqueFields() != null && !getConfig().getUniqueFields().isEmpty()) {
DocumentTransform alreadyExists = ((DocumentTransformer) this.transformerInstance).containsTransform(UniqueTransform.class);
if (alreadyExists != null) {
((UniqueTransform) alreadyExists).updateConfig(getConfig().getUniqueFields(), getQueryModel());
((UniqueTransform) alreadyExists).updateConfig(getConfig().getUniqueFields());
} else {
try {
// @formatter:off
Expand Down
Expand Up @@ -31,7 +31,7 @@ public interface DocumentTransform extends Function<Map.Entry<Key,Document>,Map.
class DefaultDocumentTransform implements DocumentTransform {
protected Query settings;
protected MarkingFunctions markingFunctions;
protected long queryExecutionForPageStartTime;
protected long queryExecutionForPageStartTime = System.currentTimeMillis();

@Override
public void initialize(Query settings, MarkingFunctions markingFunctions) {
Expand Down
Expand Up @@ -23,7 +23,6 @@
import datawave.query.common.grouping.GroupingUtils;
import datawave.query.common.grouping.Groups;
import datawave.query.iterator.profile.FinalDocumentTrackingIterator;
import datawave.query.model.QueryModel;

/**
* GroupingTransform mimics GROUP BY with a COUNT in SQL. For the given fields, this transform will group into unique combinations of values and assign a count
Expand Down Expand Up @@ -91,6 +90,10 @@ public Entry<Key,Document> apply(@Nullable Entry<Key,Document> keyDocumentEntry)
return keyDocumentEntry;
}

if (keyDocumentEntry.getValue().isIntermediateResult()) {
return keyDocumentEntry;
}

keys.add(keyDocumentEntry.getKey());
log.trace("{} get list key counts for: {}", "web-server", keyDocumentEntry);
DocumentGrouper.group(keyDocumentEntry, groupFields, groups);
Expand Down
Expand Up @@ -6,6 +6,7 @@
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
Expand Down Expand Up @@ -39,10 +40,8 @@
import datawave.query.iterator.ivarator.IvaratorCacheDirConfig;
import datawave.query.iterator.profile.FinalDocumentTrackingIterator;
import datawave.query.model.QueryModel;
import datawave.query.tables.ShardQueryLogic;
import datawave.query.util.sortedset.ByteArrayComparator;
import datawave.query.util.sortedset.FileByteDocumentSortedSet;
import datawave.query.util.sortedset.FileKeySortedSet;
import datawave.query.util.sortedset.FileKeyValueSortedSet;
import datawave.query.util.sortedset.FileSortedSet;
import datawave.query.util.sortedset.HdfsBackedSortedSet;
Expand All @@ -59,7 +58,6 @@ public class UniqueTransform extends DocumentTransform.DefaultDocumentTransform

private BloomFilter<byte[]> bloom;
private UniqueFields uniqueFields = new UniqueFields();
private Multimap<String,String> modelMapping;
private HdfsBackedSortedSet<Entry<byte[],Document>> set;
private HdfsBackedSortedSet<Entry<Key,Document>> returnSet;
private Iterator<Entry<Key,Document>> setIterator;
Expand Down Expand Up @@ -89,40 +87,13 @@ public UniqueTransform(UniqueFields uniqueFields, long queryExecutionForPageTime
}
}

/*
* Create a new {@link UniqueTransform} that will use a bloom filter to return on those results that are unique per the uniqueFields. Special uniqueness can
* be requested for date/time fields (@see UniqueFields). The logic will be used to get a query model to include the reverse mappings in the unique field
* set
*
* @param logic The query logic from whih to pull the query model
*
* @param uniqueFields The unique fields
*
* @param queryExecutionForPageTimeout If this timeout is passed before since the last result was returned, then an "intermediate" result is returned
* denoting we are still looking for the next unique result.
*/
public UniqueTransform(ShardQueryLogic logic, UniqueFields uniqueFields, long queryExecutionForPageTimeout) {
this(uniqueFields, queryExecutionForPageTimeout);
QueryModel model = logic.getQueryModel();
if (model != null) {
modelMapping = HashMultimap.create();
// reverse the reverse query mapping which will give us a mapping from the final field name to the original field name(s)
for (Map.Entry<String,String> entry : model.getReverseQueryMapping().entrySet()) {
modelMapping.put(entry.getValue(), entry.getKey());
}
}
setModelMappings(model);
}

/**
* Update the configuration of this transform. If the configuration is actually changing, then the bloom filter will be reset as well.
*
* @param uniqueFields
* The new set of unique fields.
* @param model
* The query model
*/
public void updateConfig(UniqueFields uniqueFields, QueryModel model) {
public void updateConfig(UniqueFields uniqueFields) {
// only reset the bloom filter if changing the field set
if (!this.uniqueFields.equals(uniqueFields)) {
this.uniqueFields = uniqueFields.clone();
Expand All @@ -132,23 +103,6 @@ public void updateConfig(UniqueFields uniqueFields, QueryModel model) {
log.trace("unique fields: " + this.uniqueFields.getFields());
}
}
setModelMappings(model);
}

/**
* Set the query model from which the reverse query mappings are pulled.
*
* @param model
* The query model
*/
private void setModelMappings(QueryModel model) {
if (model != null) {
modelMapping = HashMultimap.create();
// reverse the reverse query mapping which will give us a mapping from the final field name to the original field name(s)
for (Map.Entry<String,String> entry : model.getReverseQueryMapping().entrySet()) {
modelMapping.put(entry.getValue(), entry.getKey());
}
}
}

/**
Expand Down Expand Up @@ -177,6 +131,10 @@ public Entry<Key,Document> apply(@Nullable Entry<Key,Document> keyDocumentEntry)
return keyDocumentEntry;
}

if (keyDocumentEntry.getValue().isIntermediateResult()) {
return keyDocumentEntry;
}

try {
if (set != null) {
byte[] signature = getBytes(keyDocumentEntry.getValue());
Expand Down Expand Up @@ -280,50 +238,48 @@ byte[] getBytes(Document document) throws IOException {
* if we failed to generate the byte array
*/
private void outputSortedFieldValues(Document document, DataOutputStream output) throws IOException {
int count = 0;
String lastField = "";
List<String> values = new ArrayList<>();
Multimap<String,String> values = HashMultimap.create();
for (String documentField : new TreeSet<>(document.getDictionary().keySet())) {
String field = getUniqueField(documentField);
if (field != null) {
if (!field.equals(lastField)) {
count = dumpValues(count, lastField, values, output);
lastField = field;
}
addValues(field, document.get(documentField), values);
}
}
dumpValues(count, lastField, values, output);
// Always dump the fields in the same order (uniqueFields.getFields is a sorted collection)
for (String field : uniqueFields.getFields()) {
dumpValues(field, values.get(field), output);
}
output.flush();
}

/**
* Dump a list of values, sorted, to the data output stream
*
* @param count
* value count
* @param field
* a field
* @param values
* the list of values
* @param output
* the output stream
* @return The next field count
* @throws IOException
* for issues with read/write
*/
private int dumpValues(int count, String field, List<String> values, DataOutputStream output) throws IOException {
private void dumpValues(String field, Collection<String> values, DataOutputStream output) throws IOException {
String separator = "f-" + field + ":";
if (!values.isEmpty()) {
Collections.sort(values);
String separator = "f-" + field + '/' + (count++) + ":";
for (String value : values) {
List<String> valueList = new ArrayList<>(values);
// always output values in sorted order.
Collections.sort(valueList);
for (String value : valueList) {
output.writeUTF(separator);
output.writeUTF(value);
separator = ",";
}
values.clear();
} else {
// dump at least a header for empty value sets to ensure we have some bytes to check against
// in the bloom filter.
output.writeUTF(separator);
}
return count;
}

/**
Expand All @@ -334,16 +290,16 @@ private int dumpValues(int count, String field, List<String> values, DataOutputS
* @param attribute
* The attribute
* @param values
* The list of values to be updated
* The map of values to be updated
*/
private void addValues(final String field, Attribute<?> attribute, List<String> values) {
private void addValues(final String field, Attribute<?> attribute, Multimap<String,String> values) {
if (attribute instanceof Attributes) {
// @formatter:off
((Attributes) attribute).getAttributes().stream()
.forEach(a -> addValues(field, a, values));
// @formatter:on
} else {
values.add(uniqueFields.transformValue(field, String.valueOf(attribute.getData())));
values.put(field, uniqueFields.transformValue(field, String.valueOf(attribute.getData())));
}
}

Expand Down Expand Up @@ -376,8 +332,7 @@ private String getFieldWithoutGrouping(String field) {
}

/**
* Return whether or not the provided document field is considered a case-insensitive match for the provided field, applying reverse model mappings if
* configured.
* Return whether or not the provided document field is considered a case-insensitive match for the provided field
*
* @param baseField
* The base field
Expand All @@ -386,9 +341,7 @@ private String getFieldWithoutGrouping(String field) {
* @return true if matching
*/
private boolean isMatchingField(String baseField, String field) {
baseField = baseField.toUpperCase();
field = field.toUpperCase();
return field.equals(baseField) || (modelMapping != null && modelMapping.get(field).contains(baseField));
return baseField.equalsIgnoreCase(field);
}

/**
Expand Down Expand Up @@ -573,10 +526,6 @@ public Builder withQueryExecutionForPageTimeout(long timeout) {
public UniqueTransform build() throws IOException {
UniqueTransform transform = new UniqueTransform(uniqueFields, queryExecutionForPageTimeout);

if (model != null) {
transform.setModelMappings(model);
}

if (transform.uniqueFields.isMostRecent()) {
// @formatter:off
// noinspection unchecked
Expand Down
Expand Up @@ -2,6 +2,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
Expand All @@ -25,6 +26,7 @@
import java.util.stream.StreamSupport;

import org.apache.accumulo.core.data.Key;
import org.apache.commons.collections.keyvalue.UnmodifiableMapEntry;
import org.apache.commons.collections4.Transformer;
import org.apache.commons.collections4.iterators.TransformIterator;
import org.apache.commons.lang.RandomStringUtils;
Expand All @@ -35,6 +37,7 @@
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.SortedSetMultimap;
import com.google.common.collect.TreeMultimap;
import com.google.common.primitives.Longs;

Expand Down Expand Up @@ -481,6 +484,35 @@ public void testUniquenessWithTwoGroupsAndPartialGroups() {
assertOrderedFieldValues();
}

@Test
public void testFinalDocIgnored() {
SortedSetMultimap<String,UniqueGranularity> fieldMap = TreeMultimap.create();
fieldMap.put("FIELD", UniqueGranularity.ALL);
UniqueFields fields = new UniqueFields(fieldMap);
UniqueTransform transform = new UniqueTransform(fields, 10000000L);
Key key = new Key("shard", "dt\u0000uid", FinalDocumentTrackingIterator.MARKER_TEXT.toString());
Document doc = new Document();
Map.Entry<Key,Document> entry = new UnmodifiableMapEntry(key, doc);
for (int i = 0; i < 10; i++) {
assertTrue(entry == transform.apply(entry));
}
}

@Test
public void testIntermediateIgnored() {
SortedSetMultimap<String,UniqueGranularity> fieldMap = TreeMultimap.create();
fieldMap.put("FIELD", UniqueGranularity.ALL);
UniqueFields fields = new UniqueFields(fieldMap);
UniqueTransform transform = new UniqueTransform(fields, 10000000L);
Key key = new Key("shard", "dt\u0000uid");
Document doc = new Document();
doc.setIntermediateResult(true);
Map.Entry<Key,Document> entry = new UnmodifiableMapEntry(key, doc);
for (int i = 0; i < 10; i++) {
assertTrue(entry == transform.apply(entry));
}
}

protected void assertUniqueDocuments() {
List<Document> actual = getUniqueDocumentsWithUpdateConfigCalls(inputDocuments);
Collections.sort(expectedUniqueDocuments);
Expand Down Expand Up @@ -542,7 +574,7 @@ protected UniqueTransform getUniqueTransform() {
}

protected void updateUniqueTransform(UniqueTransform uniqueTransform) {
uniqueTransform.updateConfig(uniqueFields, null);
uniqueTransform.updateConfig(uniqueFields);
}

protected InputDocumentBuilder givenInputDocument() {
Expand Down Expand Up @@ -636,13 +668,16 @@ public byte[] build() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
DataOutputStream output = new DataOutputStream(bytes);
int count = 0;
for (String field : fieldValues.keySet()) {
String separator = "f-" + field + '/' + (count++) + ":";
for (String value : fieldValues.get(field)) {
String separator = "f-" + field + ":";
if (fieldValues.isEmpty()) {
output.writeUTF(separator);
output.writeUTF(value);
separator = ",";
} else {
for (String value : fieldValues.get(field)) {
output.writeUTF(separator);
output.writeUTF(value);
separator = ",";
}
}
}
output.flush();
Expand Down

0 comments on commit 15f08a3

Please sign in to comment.