Skip to content

Commit

Permalink
Add AttributeRebuilder for enriching return types when reducing type …
Browse files Browse the repository at this point in the history
…metadata
  • Loading branch information
apmoriarty committed Apr 2, 2024
1 parent 0e1f2f3 commit c86e590
Show file tree
Hide file tree
Showing 11 changed files with 919 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2387,6 +2387,12 @@ public static void configureTypeMappings(ShardQueryConfiguration config, Iterato

if (config.getReduceTypeMetadata() && !isPreload) {
Set<String> fieldsToRetain = ReduceFields.getQueryFields(config.getQueryTree());
fieldsToRetain.addAll(config.getProjectFields());
for (Entry<String,String> entry : config.getCompositeToFieldMap().entries()) {
fieldsToRetain.add(entry.getKey());
fieldsToRetain.add(entry.getValue());
}

typeMetadata = typeMetadata.reduce(fieldsToRetain);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@
import datawave.query.util.MetadataHelper;
import datawave.query.util.MetadataHelperFactory;
import datawave.query.util.QueryStopwatch;
import datawave.query.util.TypeMetadata;
import datawave.query.util.transformer.AttributeRebuilder;
import datawave.util.time.TraceStopwatch;
import datawave.webservice.common.connection.AccumuloConnectionFactory;
import datawave.webservice.common.logging.ThreadConfigurableLogger;
Expand Down Expand Up @@ -367,6 +369,7 @@ public void initialize(ShardQueryConfiguration config, AccumuloClient client, Qu
config.setAuthorizations(auths);
config.setMaxScannerBatchSize(getMaxScannerBatchSize());
config.setMaxIndexBatchSize(getMaxIndexBatchSize());
setConfig(config);

setScannerFactory(new ScannerFactory(config));

Expand Down Expand Up @@ -665,6 +668,29 @@ private void addConfigBasedTransformers() {
if (getQueryModel() != null) {
((DocumentTransformer) this.transformerInstance).setQm(getQueryModel());
}

if (getReduceTypeMetadata() || getReduceTypeMetadataPerShard()) {
try {
MetadataHelperFactory factory = getMetadataHelperFactory();
MetadataHelper helper = factory.createMetadataHelper(getConfig().getClient(), getMetadataTableName(), getConfig().getAuthorizations());
TypeMetadata typeMetadata = helper.getTypeMetadata(getConfig().getDatatypeFilter());

if (config.getQueryModel() == null) {
if (queryModel != null) {
config.setQueryModel(queryModel);
} else {
loadQueryModel(helper, config);
}
}

AttributeRebuilder rebuilder = new AttributeRebuilder(typeMetadata, getQueryModel());
((DocumentTransformer) this.transformerInstance).setAttributeRebuilder(rebuilder);
} catch (TableNotFoundException | InstantiationException | IllegalAccessException | ExecutionException e) {
log.error("could not build type metadata for responses, disabling type metadata reduction");
setReduceTypeMetadata(false);
setReduceTypeMetadataPerShard(false);
}
}
}

public void setPageProcessingStartTime(long pageProcessingStartTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@

import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;

import datawave.core.iterators.filesystem.FileSystemCache;
import datawave.query.composite.CompositeMetadata;
import datawave.query.config.ShardQueryConfiguration;
import datawave.query.exceptions.DatawaveFatalQueryException;
import datawave.query.exceptions.InvalidQueryException;
Expand Down Expand Up @@ -459,6 +462,23 @@ private void reduceTypeMetadata(ASTJexlScript script, IteratorSetting newIterato
TypeMetadata typeMetadata = new TypeMetadata(serializedTypeMetadata);

Set<String> fieldsToRetain = ReduceFields.getQueryFields(script);

// add projection fields
if (newIteratorSetting.getOptions().containsKey(QueryOptions.PROJECTION_FIELDS)) {
String opt = newIteratorSetting.getOptions().get(QueryOptions.PROJECTION_FIELDS);
fieldsToRetain.addAll(Splitter.on(',').splitToList(opt));
}

// add composite fields
if (newIteratorSetting.getOptions().containsKey(QueryOptions.COMPOSITE_METADATA)) {
String opt = newIteratorSetting.getOptions().get(QueryOptions.COMPOSITE_METADATA);
CompositeMetadata compositeMetadata = CompositeMetadata.fromBytes(java.util.Base64.getDecoder().decode(opt));
for (Multimap<String,String> multimap : compositeMetadata.getCompositeFieldMapByType().values()) {
fieldsToRetain.addAll(multimap.keySet());
fieldsToRetain.addAll(multimap.values());
}
}

typeMetadata = typeMetadata.reduce(fieldsToRetain);

serializedTypeMetadata = typeMetadata.toString();
Expand Down Expand Up @@ -488,6 +508,10 @@ private void reduceTypeMetadata(ASTJexlScript script, IteratorSetting newIterato
private void reduceIngestTypes(ASTJexlScript script, IteratorSetting newIteratorSetting) {
if (cachedTypeMetadata == null) {
String serializedTypeMetadata = newIteratorSetting.getOptions().get(QueryOptions.TYPE_METADATA);
if (org.apache.commons.lang3.StringUtils.isBlank(serializedTypeMetadata)) {
log.warn("Could not deserialize type metadata, will not attempt datatype reduction");
return;
}
cachedTypeMetadata = new TypeMetadata(serializedTypeMetadata);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import datawave.query.iterator.QueryOptions;
import datawave.query.iterator.profile.QuerySpan;
import datawave.query.jexl.JexlASTHelper;
import datawave.query.util.transformer.AttributeRebuilder;
import datawave.util.StringUtils;
import datawave.util.time.DateHelper;
import datawave.webservice.query.Query;
Expand Down Expand Up @@ -85,6 +86,9 @@ public abstract class DocumentTransformerSupport<I,O> extends EventQueryTransfor

protected List<DocumentTransform> transforms = new ArrayList<>();

// used to transform attributes when type metadata reduction is enabled
protected AttributeRebuilder rebuilder;

/*
* The 'HIT_TERM' feature required that an attribute value also contain the attribute's field name. The current implementation does it by prepending the
* field name to the value with a colon separator, like so: BUDDY:fred. In the case where a data model has been applied to the query, the
Expand All @@ -108,19 +112,19 @@ public abstract class DocumentTransformerSupport<I,O> extends EventQueryTransfor
* @param responseObjectFactory
* the response object factory
*/
public DocumentTransformerSupport(BaseQueryLogic<Entry<Key,Value>> logic, Query settings, MarkingFunctions markingFunctions,
protected DocumentTransformerSupport(BaseQueryLogic<Entry<Key,Value>> logic, Query settings, MarkingFunctions markingFunctions,
ResponseObjectFactory responseObjectFactory) {
this(logic, settings, markingFunctions, responseObjectFactory, false);
}

public DocumentTransformerSupport(BaseQueryLogic<Entry<Key,Value>> logic, Query settings, MarkingFunctions markingFunctions,
protected DocumentTransformerSupport(BaseQueryLogic<Entry<Key,Value>> logic, Query settings, MarkingFunctions markingFunctions,
ResponseObjectFactory responseObjectFactory, Boolean reducedResponse) {

this(null != logic ? logic.getTableName() : null, settings, markingFunctions, responseObjectFactory, reducedResponse);
this.logic = logic;
}

public DocumentTransformerSupport(String tableName, Query settings, MarkingFunctions markingFunctions, ResponseObjectFactory responseObjectFactory,
protected DocumentTransformerSupport(String tableName, Query settings, MarkingFunctions markingFunctions, ResponseObjectFactory responseObjectFactory,
Boolean reducedResponse) {
super(tableName, settings, markingFunctions, responseObjectFactory);

Expand Down Expand Up @@ -196,7 +200,7 @@ protected Collection<FieldBase<?>> buildDocumentFields(Key documentKey, String d
}
}

Set<FieldBase<?>> Fields = new HashSet<>();
Set<FieldBase<?>> fields = new HashSet<>();
final Map<String,Attribute<? extends Comparable<?>>> documentData = document.getDictionary();

String fn = null;
Expand All @@ -217,17 +221,17 @@ protected Collection<FieldBase<?>> buildDocumentFields(Key documentKey, String d
fn = this.getQm().aliasFieldNameReverseModel(fn);
}
attribute = data.getValue();
Fields.addAll(buildDocumentFields(documentKey, fn, attribute, topLevelColumnVisibility, markingFunctions));
fields.addAll(buildDocumentFields(documentKey, fn, attribute, topLevelColumnVisibility, markingFunctions));
}
}
return Fields;
return fields;
}

protected void extractMetrics(Document document, Key documentKey) {

Map<String,Attribute<? extends Comparable<?>>> dictionary = document.getDictionary();
Attribute<? extends Comparable<?>> timingMetadataAttribute = dictionary.get(LogTiming.TIMING_METADATA);
if (timingMetadataAttribute != null && timingMetadataAttribute instanceof TimingMetadata) {
if (timingMetadataAttribute instanceof TimingMetadata) {
TimingMetadata timingMetadata = (TimingMetadata) timingMetadataAttribute;
long currentSourceCount = timingMetadata.getSourceCount();
long currentNextCount = timingMetadata.getNextCount();
Expand Down Expand Up @@ -312,7 +316,7 @@ protected void collectCardinalities(Document document, Key documentKey, String u
String uidField = cardinalityConfiguration.getCardinalityUidField();
if (org.apache.commons.lang.StringUtils.isNotBlank(uidField)) {
List<String> documentUidValues = getFieldValues(document, uidField, true);
if (documentUidValues.isEmpty() == false) {
if (!documentUidValues.isEmpty()) {
eventId = documentUidValues.get(0);
}
}
Expand Down Expand Up @@ -413,6 +417,10 @@ protected Collection<FieldBase<?>> buildDocumentFields(Key documentKey, String f
// Use the markings on the Field if we're returning the markings to the client
if (!this.reducedResponse) {
try {
if (rebuilder != null) {
attr = rebuilder.rebuild(fieldName, attr);
}

Map<String,String> markings = markingFunctions.translateFromColumnVisibility(attr.getColumnVisibility());
FieldBase<?> field = this.makeField(fieldName, markings, attr.getColumnVisibility(), attr.getTimestamp(), attr.getData());
MarkingFunctions.Util.populate(field, markings);
Expand Down Expand Up @@ -607,4 +615,14 @@ public void setDisallowlistedFields(Set<String> disallowlistedFields) {
public void setPrimaryToSecondaryFieldMap(Map<String,List<String>> primaryToSecondaryFieldMap) {
addTransform(new FieldMappingTransform(primaryToSecondaryFieldMap, false, reducedResponse));
}

/**
* Set an {@link AttributeRebuilder}, required when reducing type metadata
*
* @param rebuilder
* an AttributeRebuilder
*/
public void setAttributeRebuilder(AttributeRebuilder rebuilder) {
this.rebuilder = rebuilder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ public BaseQueryResponse createResponse(List<Object> resultList) {
fieldSet.add(f.getName());
}
eventList.add(e);

}
response.setFields(Lists.newArrayList(fieldSet));
response.setEvents(eventList);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package datawave.query.util.transformer;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import javax.annotation.Nullable;

import org.apache.log4j.Logger;

import datawave.data.type.Type;
import datawave.query.attributes.Attribute;
import datawave.query.attributes.TypeAttribute;
import datawave.query.jexl.JexlASTHelper;
import datawave.query.model.QueryModel;
import datawave.query.util.TypeMetadata;

/**
* Utility class that handles rebuilding a basic NoOp attribute into an attribute of the correct type, given TypeMetadata and optionally a QueryModel
*/
public class AttributeRebuilder {
private static final Logger log = Logger.getLogger(AttributeRebuilder.class);
private final TypeMetadata typeMetadata;
private final Map<String,String> fieldMap;
private final Map<String,Class<?>> classCache;

/**
* Main constructor for the rebuilder, QueryModel may be null
*
* @param typeMetadata
* @param queryModel
*/
public AttributeRebuilder(TypeMetadata typeMetadata, @Nullable QueryModel queryModel) {
this.typeMetadata = typeMetadata;
if (queryModel == null) {
this.fieldMap = new HashMap<>();
} else {
this.fieldMap = invertMap(queryModel.getReverseQueryMapping());
}
this.classCache = new HashMap<>();
}

private Map<String,String> invertMap(Map<String,String> map) {
Map<String,String> mappings = new HashMap<>();
for (Map.Entry<String,String> entry : map.entrySet()) {
mappings.put(entry.getValue(), entry.getKey());
}
return mappings;
}

/**
* Given a field and an attribute, return the correctly typed attribute
*
* @param field
* the field
* @param attr
* the attribute
* @return an attribute of the correct type
*/
public Attribute rebuild(String field, Attribute<?> attr) {
field = JexlASTHelper.deconstructIdentifier(field);
Set<String> normalizerNames = typeMetadata.getNormalizerNamesForField(field);

if (normalizerNames.isEmpty()) {
populateNormalizerFromQueryModel(field, normalizerNames);
}

if (normalizerNames.size() > 1 && log.isTraceEnabled()) {
log.trace("Found " + normalizerNames.size() + " normalizers for field " + field + ", using first normalizer");
}

for (String name : normalizerNames) {
try {
Class<?> clazz = getClass(name);
Type<?> type = (Type<?>) clazz.getDeclaredConstructor().newInstance();

type.setDelegateFromString(String.valueOf(attr.getData()));
return new TypeAttribute<>(type, attr.getMetadata(), attr.isToKeep());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

return attr;
}

/**
* Populate normalizer names from the query model
*
* @param field
* the field
* @param normalizerNames
* the set of normalizer names
*/
private void populateNormalizerFromQueryModel(String field, Set<String> normalizerNames) {
if (log.isTraceEnabled()) {
log.trace("Field " + field + " not found in TypeMetadata, falling back to QueryModel");
}

String alias = fieldMap.get(field);
if (alias == null) {
log.error("Field " + field + " did not have a reverse mapping in the query model");
}

normalizerNames.addAll(typeMetadata.getNormalizerNamesForField(alias));
}

/**
* Get the class for the provided name
*
* @param name
* the name
* @return a class
*/
private Class<?> getClass(String name) {
return classCache.computeIfAbsent(name, className -> {
try {
return Class.forName(className);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
});
}
}

0 comments on commit c86e590

Please sign in to comment.