Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix GlobalIndexUidAggregator complaining #2302

Draft
wants to merge 1 commit into
base: integration
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -57,6 +57,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.protobuf.InvalidProtocolBufferException;

import datawave.configuration.DatawaveEmbeddedProjectStageHolder;
import datawave.configuration.spring.SpringBean;
Expand All @@ -70,6 +71,7 @@
import datawave.ingest.mapreduce.handler.shard.AbstractColumnBasedHandler;
import datawave.ingest.mapreduce.job.BulkIngestKey;
import datawave.ingest.mapreduce.job.writer.LiveContextWriter;
import datawave.ingest.protobuf.Uid;
import datawave.ingest.table.config.TableConfigHelper;
import datawave.marking.MarkingFunctions;
import datawave.microservice.querymetric.BaseQueryMetric;
Expand Down Expand Up @@ -147,15 +149,14 @@ public class ShardTableQueryMetricHandler extends BaseQueryMetricHandler<QueryMe
public static final String CONTEXT_WRITER_MAX_CACHE_SIZE = "context.writer.max.cache.size";

// static to share the cache across instances of this class held by QueryExecutorBean, CachedResultsBean, QueryMetricsEnrichmentInterceptor, etc
@SuppressWarnings("unchecked")
private static Map metricsCache = Collections.synchronizedMap(new LRUMap(5000));
private static Map metricsCache = Collections.synchronizedMap(new LRUMap<>(5000));

private final Configuration conf = new Configuration();
private final StatusReporter reporter = new MockStatusReporter();
private final AtomicBoolean tablesChecked = new AtomicBoolean(false);
private AccumuloRecordWriter recordWriter = null;

private UIDBuilder<UID> uidBuilder = UID.builder();
private final UIDBuilder<UID> uidBuilder = UID.builder();

public ShardTableQueryMetricHandler() {
URL queryMetricsUrl = Thread.currentThread().getContextClassLoader().getResource("datawave/query/QueryMetrics.xml");
Expand Down Expand Up @@ -275,7 +276,7 @@ public Map<String,String> getEventFields(BaseQueryMetric queryMetric) {
}

private Multimap<BulkIngestKey,Value> getEntries(AbstractColumnBasedHandler<Key> handler, QueryMetric updatedQueryMetric, QueryMetric storedQueryMetric,
Date lastUpdated, boolean delete) {
Date lastUpdated, boolean delete) throws Exception {
Type type = TypeRegistry.getType("querymetrics");
ContentQueryMetricsIngestHelper ingestHelper = new ContentQueryMetricsIngestHelper(delete);

Expand Down Expand Up @@ -356,12 +357,42 @@ private Multimap<BulkIngestKey,Value> getEntries(AbstractColumnBasedHandler<Key>
// this will ensure that the QueryMetrics can be found within second precision in most cases
entry.getKey().getKey().setTimestamp(storedQueryMetric.getCreateDate().getTime() + storedQueryMetric.getNumUpdates());
}
entry.getKey().getKey().setDeleted(delete);
String table = entry.getKey().getTableName().toString();
if (table.equals(indexTable) || table.equals(reverseIndexTable)) {
Collection<Value> removedValues = Lists.newArrayList();
for (Value value : entry.getValue()) {
removedValues.add(buildRemovalUidList(value));
}
entry.setValue(removedValues);
} else {
entry.getKey().getKey().setDeleted(delete);
}
}

return r;
}

private Value buildRemovalUidList(Value value) throws Exception {
Uid.List uidList;

try {
uidList = Uid.List.parseFrom(value.get());
} catch (InvalidProtocolBufferException e) {
throw new Exception("Unexpected value format for our value" + value);
}

if (uidList.getUIDCount() != 1) {
throw new Exception(("We expect all Uid.List values to be one"));
}

Uid.List.Builder removalList = Uid.List.newBuilder();
removalList.setIGNORE(false);
removalList.setCOUNT(-1);
removalList.addREMOVEDUID(uidList.getUID(0));

return new Value(removalList.build().toByteArray());
}

@SuppressWarnings("unchecked")
@Override
public void updateMetric(QueryMetric updatedQueryMetric, DatawavePrincipal datawavePrincipal) throws Exception {
Expand All @@ -375,7 +406,7 @@ public void updateMetric(QueryMetric updatedQueryMetric, DatawavePrincipal dataw
}

// find and remove previous entries
BaseQueryMetricListResponse response = new QueryMetricListResponse();
BaseQueryMetricListResponse<QueryMetric> response = new QueryMetricListResponse();
Date end = new Date();
Date begin = DateUtils.setYears(end, 2000);

Expand All @@ -393,12 +424,11 @@ public void updateMetric(QueryMetric updatedQueryMetric, DatawavePrincipal dataw
}
}
}
// combine all of the page metrics from the cached metric and the updated metric
// combine all the page metrics from the cached metric and the updated metric
for (PageMetric p : updatedQueryMetric.getPageTimes()) {
storedPageMetricMap.put(p.getPageNumber(), p);
}
ArrayList<PageMetric> newPageMetrics = new ArrayList<>();
newPageMetrics.addAll(storedPageMetricMap.values());
ArrayList<PageMetric> newPageMetrics = new ArrayList<>(storedPageMetricMap.values());
updatedQueryMetric.setPageTimes(newPageMetrics);
metricsCache.put(updatedQueryMetric.getQueryId(), updatedQueryMetric);
}
Expand Down Expand Up @@ -599,8 +629,6 @@ public BaseQueryMetric toMetric(EventBase event) {
m.setPlan(fieldValue);
} else if (fieldName.equals("QUERY_LOGIC")) {
m.setQueryLogic(fieldValue);
} else if (fieldName.equals("QUERY_ID")) {
m.setQueryId(fieldValue);
} else if (fieldName.equals("BEGIN_DATE")) {
try {
Date d = sdf_date_time1.parse(fieldValue);
Expand Down Expand Up @@ -638,7 +666,7 @@ public BaseQueryMetric toMetric(EventBase event) {
if (-1 == index) {
log.error("Could not parse field name to extract repetition count: " + fieldName);
} else {
Long pageNum = Long.parseLong(fieldName.substring(index + 1));
long pageNum = Long.parseLong(fieldName.substring(index + 1));
PageMetric pageMetric = PageMetric.parse(fieldValue);
if (pageMetric != null) {
pageMetric.setPageNumber(pageNum);
Expand Down Expand Up @@ -701,8 +729,6 @@ public BaseQueryMetric toMetric(EventBase event) {
m.addVersion(BaseQueryMetric.DATAWAVE, fieldValue);
} else if (fieldName.startsWith("VERSION.")) {
m.addVersion(fieldName.substring(8), fieldValue);
} else if (fieldName.equals("YIELD_COUNT")) {
m.setYieldCount(Long.parseLong(fieldValue));
} else if (fieldName.equals("LOGIN_TIME")) {
m.setLoginTime(Long.parseLong(fieldValue));
} else {
Expand All @@ -715,9 +741,7 @@ public BaseQueryMetric toMetric(EventBase event) {
try {
String dateStr = event.getMetadata().getRow().substring(0, 8);
m.setCreateDate(sdf_date_time3.parse(dateStr));
} catch (ParseException e) {

}
} catch (ParseException ignored) {}
}
m.setPageTimes(new ArrayList<>(pageMetrics.values()));
return m;
Expand Down