Skip to content

Commit

Permalink
Initial queryMetrics delete flag fix
Browse files Browse the repository at this point in the history
  • Loading branch information
mineralntl committed Mar 13, 2024
1 parent b087ac9 commit 4e4455c
Showing 1 changed file with 41 additions and 17 deletions.
Expand Up @@ -57,6 +57,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.protobuf.InvalidProtocolBufferException;

import datawave.configuration.DatawaveEmbeddedProjectStageHolder;
import datawave.configuration.spring.SpringBean;
Expand All @@ -70,6 +71,7 @@
import datawave.ingest.mapreduce.handler.shard.AbstractColumnBasedHandler;
import datawave.ingest.mapreduce.job.BulkIngestKey;
import datawave.ingest.mapreduce.job.writer.LiveContextWriter;
import datawave.ingest.protobuf.Uid;
import datawave.ingest.table.config.TableConfigHelper;
import datawave.marking.MarkingFunctions;
import datawave.microservice.querymetric.BaseQueryMetric;
Expand Down Expand Up @@ -147,15 +149,14 @@ public class ShardTableQueryMetricHandler extends BaseQueryMetricHandler<QueryMe
public static final String CONTEXT_WRITER_MAX_CACHE_SIZE = "context.writer.max.cache.size";

// static to share the cache across instances of this class held by QueryExecutorBean, CachedResultsBean, QueryMetricsEnrichmentInterceptor, etc
@SuppressWarnings("unchecked")
private static Map metricsCache = Collections.synchronizedMap(new LRUMap(5000));
private static Map metricsCache = Collections.synchronizedMap(new LRUMap<>(5000));

private final Configuration conf = new Configuration();
private final StatusReporter reporter = new MockStatusReporter();
private final AtomicBoolean tablesChecked = new AtomicBoolean(false);
private AccumuloRecordWriter recordWriter = null;

private UIDBuilder<UID> uidBuilder = UID.builder();
private final UIDBuilder<UID> uidBuilder = UID.builder();

public ShardTableQueryMetricHandler() {
URL queryMetricsUrl = Thread.currentThread().getContextClassLoader().getResource("datawave/query/QueryMetrics.xml");
Expand Down Expand Up @@ -275,7 +276,7 @@ public Map<String,String> getEventFields(BaseQueryMetric queryMetric) {
}

private Multimap<BulkIngestKey,Value> getEntries(AbstractColumnBasedHandler<Key> handler, QueryMetric updatedQueryMetric, QueryMetric storedQueryMetric,
Date lastUpdated, boolean delete) {
Date lastUpdated, boolean delete) throws Exception {
Type type = TypeRegistry.getType("querymetrics");
ContentQueryMetricsIngestHelper ingestHelper = new ContentQueryMetricsIngestHelper(delete);

Expand Down Expand Up @@ -356,12 +357,42 @@ private Multimap<BulkIngestKey,Value> getEntries(AbstractColumnBasedHandler<Key>
// this will ensure that the QueryMetrics can be found within second precision in most cases
entry.getKey().getKey().setTimestamp(storedQueryMetric.getCreateDate().getTime() + storedQueryMetric.getNumUpdates());
}
entry.getKey().getKey().setDeleted(delete);
String table = entry.getKey().getTableName().toString();
if (table.equals(indexTable) || table.equals(reverseIndexTable)) {
Collection<Value> removedValues = Lists.newArrayList();
for (Value value : entry.getValue()) {
removedValues.add(buildRemovalUidList(value));
}
entry.setValue(removedValues);
} else {
entry.getKey().getKey().setDeleted(delete);
}
}

return r;
}

private Value buildRemovalUidList(Value value) throws Exception {
Uid.List uidList;

try {
uidList = Uid.List.parseFrom(value.get());
} catch (InvalidProtocolBufferException e) {
throw new Exception("Unexpected value format for our value" + value);
}

if (uidList.getUIDCount() != 1) {
throw new Exception(("We expect all Uid.List values to be one"));
}

Uid.List.Builder removalList = Uid.List.newBuilder();
removalList.setIGNORE(false);
removalList.setCOUNT(-1);
removalList.addREMOVEDUID(uidList.getUID(0));

return new Value(removalList.build().toByteArray());
}

@SuppressWarnings("unchecked")
@Override
public void updateMetric(QueryMetric updatedQueryMetric, DatawavePrincipal datawavePrincipal) throws Exception {
Expand All @@ -375,7 +406,7 @@ public void updateMetric(QueryMetric updatedQueryMetric, DatawavePrincipal dataw
}

// find and remove previous entries
BaseQueryMetricListResponse response = new QueryMetricListResponse();
BaseQueryMetricListResponse<QueryMetric> response = new QueryMetricListResponse();
Date end = new Date();
Date begin = DateUtils.setYears(end, 2000);

Expand All @@ -393,12 +424,11 @@ public void updateMetric(QueryMetric updatedQueryMetric, DatawavePrincipal dataw
}
}
}
// combine all of the page metrics from the cached metric and the updated metric
// combine all the page metrics from the cached metric and the updated metric
for (PageMetric p : updatedQueryMetric.getPageTimes()) {
storedPageMetricMap.put(p.getPageNumber(), p);
}
ArrayList<PageMetric> newPageMetrics = new ArrayList<>();
newPageMetrics.addAll(storedPageMetricMap.values());
ArrayList<PageMetric> newPageMetrics = new ArrayList<>(storedPageMetricMap.values());
updatedQueryMetric.setPageTimes(newPageMetrics);
metricsCache.put(updatedQueryMetric.getQueryId(), updatedQueryMetric);
}
Expand Down Expand Up @@ -599,8 +629,6 @@ public BaseQueryMetric toMetric(EventBase event) {
m.setPlan(fieldValue);
} else if (fieldName.equals("QUERY_LOGIC")) {
m.setQueryLogic(fieldValue);
} else if (fieldName.equals("QUERY_ID")) {
m.setQueryId(fieldValue);
} else if (fieldName.equals("BEGIN_DATE")) {
try {
Date d = sdf_date_time1.parse(fieldValue);
Expand Down Expand Up @@ -638,7 +666,7 @@ public BaseQueryMetric toMetric(EventBase event) {
if (-1 == index) {
log.error("Could not parse field name to extract repetition count: " + fieldName);
} else {
Long pageNum = Long.parseLong(fieldName.substring(index + 1));
long pageNum = Long.parseLong(fieldName.substring(index + 1));
PageMetric pageMetric = PageMetric.parse(fieldValue);
if (pageMetric != null) {
pageMetric.setPageNumber(pageNum);
Expand Down Expand Up @@ -701,8 +729,6 @@ public BaseQueryMetric toMetric(EventBase event) {
m.addVersion(BaseQueryMetric.DATAWAVE, fieldValue);
} else if (fieldName.startsWith("VERSION.")) {
m.addVersion(fieldName.substring(8), fieldValue);
} else if (fieldName.equals("YIELD_COUNT")) {
m.setYieldCount(Long.parseLong(fieldValue));
} else if (fieldName.equals("LOGIN_TIME")) {
m.setLoginTime(Long.parseLong(fieldValue));
} else {
Expand All @@ -715,9 +741,7 @@ public BaseQueryMetric toMetric(EventBase event) {
try {
String dateStr = event.getMetadata().getRow().substring(0, 8);
m.setCreateDate(sdf_date_time3.parse(dateStr));
} catch (ParseException e) {

}
} catch (ParseException ignored) {}
}
m.setPageTimes(new ArrayList<>(pageMetrics.values()));
return m;
Expand Down

0 comments on commit 4e4455c

Please sign in to comment.