Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed bug where unique results of null values failed to return #2354

Merged
merged 6 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;

import org.apache.commons.lang.StringUtils;

Expand All @@ -28,7 +30,7 @@
*/
public class UniqueFields implements Serializable, Cloneable {

private final SortedSetMultimap<String,UniqueGranularity> fieldMap = TreeMultimap.create();
private final TreeMultimap<String,UniqueGranularity> fieldMap = TreeMultimap.create();
private boolean mostRecent = false;

/**
Expand Down Expand Up @@ -213,21 +215,21 @@ public void replace(String field, String replacement) {
}

/**
* Return a copy of the fields within this {@link UniqueFields}. Modifications to this set will not modify the fields in this {@link UniqueFields}.
* Return the fields within this {@link UniqueFields}. Modifications to this set will modify the fields in this {@link UniqueFields}.
*
* @return a copy of the fields
*/
public Set<String> getFields() {
return Sets.newHashSet(fieldMap.keySet());
public NavigableSet<String> getFields() {
return fieldMap.keySet();
}

/**
* Return the underlying field-granularity map from this {@link UniqueFields}.
*
* @return the field map
*/
public Multimap<String,UniqueGranularity> getFieldMap() {
return TreeMultimap.create(fieldMap);
public TreeMultimap<String,UniqueGranularity> getFieldMap() {
return fieldMap;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ private void addConfigBasedTransformers() throws QueryException {
if (getConfig().getUniqueFields() != null && !getConfig().getUniqueFields().isEmpty()) {
DocumentTransform alreadyExists = ((DocumentTransformer) this.transformerInstance).containsTransform(UniqueTransform.class);
if (alreadyExists != null) {
((UniqueTransform) alreadyExists).updateConfig(getConfig().getUniqueFields(), getQueryModel());
((UniqueTransform) alreadyExists).updateConfig(getConfig().getUniqueFields());
} else {
try {
// @formatter:off
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface DocumentTransform extends Function<Map.Entry<Key,Document>,Map.
class DefaultDocumentTransform implements DocumentTransform {
protected Query settings;
protected MarkingFunctions markingFunctions;
protected long queryExecutionForPageStartTime;
protected long queryExecutionForPageStartTime = System.currentTimeMillis();

@Override
public void initialize(Query settings, MarkingFunctions markingFunctions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import datawave.query.common.grouping.GroupingUtils;
import datawave.query.common.grouping.Groups;
import datawave.query.iterator.profile.FinalDocumentTrackingIterator;
import datawave.query.model.QueryModel;

/**
* GroupingTransform mimics GROUP BY with a COUNT in SQL. For the given fields, this transform will group into unique combinations of values and assign a count
Expand Down Expand Up @@ -91,6 +90,10 @@ public Entry<Key,Document> apply(@Nullable Entry<Key,Document> keyDocumentEntry)
return keyDocumentEntry;
}

if (keyDocumentEntry.getValue().isIntermediateResult()) {
return keyDocumentEntry;
}

keys.add(keyDocumentEntry.getKey());
log.trace("{} get list key counts for: {}", "web-server", keyDocumentEntry);
DocumentGrouper.group(keyDocumentEntry, groupFields, groups);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
Expand Down Expand Up @@ -39,10 +40,8 @@
import datawave.query.iterator.ivarator.IvaratorCacheDirConfig;
import datawave.query.iterator.profile.FinalDocumentTrackingIterator;
import datawave.query.model.QueryModel;
import datawave.query.tables.ShardQueryLogic;
import datawave.query.util.sortedset.ByteArrayComparator;
import datawave.query.util.sortedset.FileByteDocumentSortedSet;
import datawave.query.util.sortedset.FileKeySortedSet;
import datawave.query.util.sortedset.FileKeyValueSortedSet;
import datawave.query.util.sortedset.FileSortedSet;
import datawave.query.util.sortedset.HdfsBackedSortedSet;
Expand All @@ -59,7 +58,6 @@ public class UniqueTransform extends DocumentTransform.DefaultDocumentTransform

private BloomFilter<byte[]> bloom;
private UniqueFields uniqueFields = new UniqueFields();
private Multimap<String,String> modelMapping;
private HdfsBackedSortedSet<Entry<byte[],Document>> set;
private HdfsBackedSortedSet<Entry<Key,Document>> returnSet;
private Iterator<Entry<Key,Document>> setIterator;
Expand Down Expand Up @@ -89,40 +87,13 @@ public UniqueTransform(UniqueFields uniqueFields, long queryExecutionForPageTime
}
}

/*
* Create a new {@link UniqueTransform} that will use a bloom filter to return on those results that are unique per the uniqueFields. Special uniqueness can
* be requested for date/time fields (@see UniqueFields). The logic will be used to get a query model to include the reverse mappings in the unique field
* set
*
* @param logic The query logic from whih to pull the query model
*
* @param uniqueFields The unique fields
*
* @param queryExecutionForPageTimeout If this timeout is passed before since the last result was returned, then an "intermediate" result is returned
* denoting we are still looking for the next unique result.
*/
public UniqueTransform(ShardQueryLogic logic, UniqueFields uniqueFields, long queryExecutionForPageTimeout) {
this(uniqueFields, queryExecutionForPageTimeout);
QueryModel model = logic.getQueryModel();
if (model != null) {
modelMapping = HashMultimap.create();
// reverse the reverse query mapping which will give us a mapping from the final field name to the original field name(s)
for (Map.Entry<String,String> entry : model.getReverseQueryMapping().entrySet()) {
modelMapping.put(entry.getValue(), entry.getKey());
}
}
setModelMappings(model);
}

/**
* Update the configuration of this transform. If the configuration is actually changing, then the bloom filter will be reset as well.
*
* @param uniqueFields
* The new set of unique fields.
* @param model
* The query model
*/
public void updateConfig(UniqueFields uniqueFields, QueryModel model) {
public void updateConfig(UniqueFields uniqueFields) {
// only reset the bloom filter if changing the field set
if (!this.uniqueFields.equals(uniqueFields)) {
this.uniqueFields = uniqueFields.clone();
Expand All @@ -132,23 +103,6 @@ public void updateConfig(UniqueFields uniqueFields, QueryModel model) {
log.trace("unique fields: " + this.uniqueFields.getFields());
}
}
setModelMappings(model);
}

/**
* Set the query model from which the reverse query mappings are pulled.
*
* @param model
* The query model
*/
private void setModelMappings(QueryModel model) {
if (model != null) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole mapping is already done in the DefaultQueryPlanner when setting up the query fields. This was redundant.

modelMapping = HashMultimap.create();
// reverse the reverse query mapping which will give us a mapping from the final field name to the original field name(s)
for (Map.Entry<String,String> entry : model.getReverseQueryMapping().entrySet()) {
modelMapping.put(entry.getValue(), entry.getKey());
}
}
}

/**
Expand Down Expand Up @@ -177,6 +131,10 @@ public Entry<Key,Document> apply(@Nullable Entry<Key,Document> keyDocumentEntry)
return keyDocumentEntry;
}

if (keyDocumentEntry.getValue().isIntermediateResult()) {
return keyDocumentEntry;
}

Copy link
Collaborator Author

@ivakegg ivakegg Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the critical piece to fix the null document issue.

try {
if (set != null) {
byte[] signature = getBytes(keyDocumentEntry.getValue());
Expand Down Expand Up @@ -280,50 +238,48 @@ byte[] getBytes(Document document) throws IOException {
* if we failed to generate the byte array
*/
private void outputSortedFieldValues(Document document, DataOutputStream output) throws IOException {
int count = 0;
String lastField = "";
List<String> values = new ArrayList<>();
Multimap<String,String> values = HashMultimap.create();
for (String documentField : new TreeSet<>(document.getDictionary().keySet())) {
String field = getUniqueField(documentField);
if (field != null) {
if (!field.equals(lastField)) {
count = dumpValues(count, lastField, values, output);
lastField = field;
}
addValues(field, document.get(documentField), values);
}
}
dumpValues(count, lastField, values, output);
// Always dump the fields in the same order (uniqueFields.getFields is a sorted collection)
for (String field : uniqueFields.getFields()) {
dumpValues(field, values.get(field), output);
}
output.flush();
}

/**
* Dump a list of values, sorted, to the data output stream
*
* @param count
* value count
* @param field
* a field
* @param values
* the list of values
* @param output
* the output stream
* @return The next field count
* @throws IOException
* for issues with read/write
*/
private int dumpValues(int count, String field, List<String> values, DataOutputStream output) throws IOException {
private void dumpValues(String field, Collection<String> values, DataOutputStream output) throws IOException {
String separator = "f-" + field + ":";
if (!values.isEmpty()) {
Collections.sort(values);
String separator = "f-" + field + '/' + (count++) + ":";
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the count in here is not really useful.

for (String value : values) {
List<String> valueList = new ArrayList<>(values);
// always output values in sorted order.
Collections.sort(valueList);
for (String value : valueList) {
output.writeUTF(separator);
output.writeUTF(value);
separator = ",";
}
values.clear();
} else {
// dump at least a header for empty value sets to ensure we have some bytes to check against
// in the bloom filter.
output.writeUTF(separator);
}
return count;
}

/**
Expand All @@ -334,16 +290,16 @@ private int dumpValues(int count, String field, List<String> values, DataOutputS
* @param attribute
* The attribute
* @param values
* The list of values to be updated
* The map of values to be updated
*/
private void addValues(final String field, Attribute<?> attribute, List<String> values) {
private void addValues(final String field, Attribute<?> attribute, Multimap<String,String> values) {
if (attribute instanceof Attributes) {
// @formatter:off
((Attributes) attribute).getAttributes().stream()
.forEach(a -> addValues(field, a, values));
// @formatter:on
} else {
values.add(uniqueFields.transformValue(field, String.valueOf(attribute.getData())));
values.put(field, uniqueFields.transformValue(field, String.valueOf(attribute.getData())));
}
}

Expand Down Expand Up @@ -376,8 +332,7 @@ private String getFieldWithoutGrouping(String field) {
}

/**
* Return whether or not the provided document field is considered a case-insensitive match for the provided field, applying reverse model mappings if
* configured.
* Return whether or not the provided document field is considered a case-insensitive match for the provided field
*
* @param baseField
* The base field
Expand All @@ -386,9 +341,7 @@ private String getFieldWithoutGrouping(String field) {
* @return true if matching
*/
private boolean isMatchingField(String baseField, String field) {
baseField = baseField.toUpperCase();
field = field.toUpperCase();
return field.equals(baseField) || (modelMapping != null && modelMapping.get(field).contains(baseField));
return baseField.equalsIgnoreCase(field);
}

/**
Expand Down Expand Up @@ -573,10 +526,6 @@ public Builder withQueryExecutionForPageTimeout(long timeout) {
public UniqueTransform build() throws IOException {
UniqueTransform transform = new UniqueTransform(uniqueFields, queryExecutionForPageTimeout);

if (model != null) {
transform.setModelMappings(model);
}

if (transform.uniqueFields.isMostRecent()) {
// @formatter:off
// noinspection unchecked
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
Expand All @@ -25,6 +26,7 @@
import java.util.stream.StreamSupport;

import org.apache.accumulo.core.data.Key;
import org.apache.commons.collections.keyvalue.UnmodifiableMapEntry;
import org.apache.commons.collections4.Transformer;
import org.apache.commons.collections4.iterators.TransformIterator;
import org.apache.commons.lang.RandomStringUtils;
Expand All @@ -35,6 +37,7 @@
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.SortedSetMultimap;
import com.google.common.collect.TreeMultimap;
import com.google.common.primitives.Longs;

Expand Down Expand Up @@ -481,6 +484,35 @@ public void testUniquenessWithTwoGroupsAndPartialGroups() {
assertOrderedFieldValues();
}

@Test
public void testFinalDocIgnored() {
SortedSetMultimap<String,UniqueGranularity> fieldMap = TreeMultimap.create();
fieldMap.put("FIELD", UniqueGranularity.ALL);
UniqueFields fields = new UniqueFields(fieldMap);
UniqueTransform transform = new UniqueTransform(fields, 10000000L);
Key key = new Key("shard", "dt\u0000uid", FinalDocumentTrackingIterator.MARKER_TEXT.toString());
Document doc = new Document();
Map.Entry<Key,Document> entry = new UnmodifiableMapEntry(key, doc);
for (int i = 0; i < 10; i++) {
assertTrue(entry == transform.apply(entry));
}
}

@Test
public void testIntermediateIgnored() {
SortedSetMultimap<String,UniqueGranularity> fieldMap = TreeMultimap.create();
fieldMap.put("FIELD", UniqueGranularity.ALL);
UniqueFields fields = new UniqueFields(fieldMap);
UniqueTransform transform = new UniqueTransform(fields, 10000000L);
Key key = new Key("shard", "dt\u0000uid");
Document doc = new Document();
doc.setIntermediateResult(true);
Map.Entry<Key,Document> entry = new UnmodifiableMapEntry(key, doc);
for (int i = 0; i < 10; i++) {
assertTrue(entry == transform.apply(entry));
}
}

protected void assertUniqueDocuments() {
List<Document> actual = getUniqueDocumentsWithUpdateConfigCalls(inputDocuments);
Collections.sort(expectedUniqueDocuments);
Expand Down Expand Up @@ -542,7 +574,7 @@ protected UniqueTransform getUniqueTransform() {
}

protected void updateUniqueTransform(UniqueTransform uniqueTransform) {
uniqueTransform.updateConfig(uniqueFields, null);
uniqueTransform.updateConfig(uniqueFields);
}

protected InputDocumentBuilder givenInputDocument() {
Expand Down Expand Up @@ -636,13 +668,16 @@ public byte[] build() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
DataOutputStream output = new DataOutputStream(bytes);
int count = 0;
for (String field : fieldValues.keySet()) {
String separator = "f-" + field + '/' + (count++) + ":";
for (String value : fieldValues.get(field)) {
String separator = "f-" + field + ":";
if (fieldValues.isEmpty()) {
output.writeUTF(separator);
output.writeUTF(value);
separator = ",";
} else {
for (String value : fieldValues.get(field)) {
output.writeUTF(separator);
output.writeUTF(value);
separator = ",";
}
}
}
output.flush();
Expand Down