Skip to content

Commit

Permalink
Re:NationalSecurityAgency#879 - Removed trigger booleans, is page les…
Browse files Browse the repository at this point in the history
…s than requested page size, update status
  • Loading branch information
plainolneesh committed Jun 2, 2021
1 parent 7bd9ac2 commit e7c9771
Showing 1 changed file with 30 additions and 31 deletions.
Expand Up @@ -45,11 +45,11 @@
*
*/
public class RunningQuery extends AbstractRunningQuery implements Runnable {

private static final long serialVersionUID = 1L;

private static Logger log = Logger.getLogger(RunningQuery.class);

private transient AccumuloClient client = null;
private AccumuloConnectionFactory.Priority connectionPriority = null;
private transient QueryLogic<?> logic = null;
Expand All @@ -67,32 +67,32 @@ public class RunningQuery extends AbstractRunningQuery implements Runnable {
private volatile Future<Object> future = null;
private QueryPredictor predictor = null;
private long maxResults = 0;

public RunningQuery() {
super(new QueryMetricFactoryImpl());
}

public RunningQuery(AccumuloClient client, AccumuloConnectionFactory.Priority priority, QueryLogic<?> logic, Query settings, String methodAuths,
Principal principal, QueryMetricFactory metricFactory) throws Exception {
this(null, client, priority, logic, settings, methodAuths, principal, null, null, metricFactory);
}

public RunningQuery(AccumuloClient client, AccumuloConnectionFactory.Priority priority, QueryLogic<?> logic, Query settings, String methodAuths,
Principal principal, RunningQueryTiming timing, ExecutorService executor, QueryMetricFactory metricFactory) throws Exception {
this(null, client, priority, logic, settings, methodAuths, principal, timing, executor, metricFactory);
}

public RunningQuery(QueryMetricsBean queryMetrics, AccumuloClient client, AccumuloConnectionFactory.Priority priority, QueryLogic<?> logic, Query settings,
String methodAuths, Principal principal, QueryMetricFactory metricFactory) throws Exception {
this(queryMetrics, client, priority, logic, settings, methodAuths, principal, null, null, metricFactory);
}

public RunningQuery(QueryMetricsBean queryMetrics, AccumuloClient client, AccumuloConnectionFactory.Priority priority, QueryLogic<?> logic, Query settings,
String methodAuths, Principal principal, RunningQueryTiming timing, ExecutorService executor, QueryMetricFactory metricFactory)
throws Exception {
this(queryMetrics, client, priority, logic, settings, methodAuths, principal, timing, executor, null, metricFactory);
}

public RunningQuery(QueryMetricsBean queryMetrics, AccumuloClient client, AccumuloConnectionFactory.Priority priority, QueryLogic<?> logic, Query settings,
String methodAuths, Principal principal, RunningQueryTiming timing, ExecutorService executor, QueryPredictor predictor,
QueryMetricFactory metricFactory) throws Exception {
Expand Down Expand Up @@ -128,7 +128,7 @@ public RunningQuery(QueryMetricsBean queryMetrics, AccumuloClient client, Accumu
+ " has a DN configured with a different limit");
}
}

public static RunningQuery createQueryWithAuthorizations(QueryMetricsBean queryMetrics, AccumuloClient client, AccumuloConnectionFactory.Priority priority,
QueryLogic<?> logic, Query settings, String methodAuths, RunningQueryTiming timing, ExecutorService executor, QueryPredictor predictor,
QueryMetricFactory metricFactory) throws Exception {
Expand All @@ -137,27 +137,27 @@ public static RunningQuery createQueryWithAuthorizations(QueryMetricsBean queryM
runningQuery.calculatedAuths = Collections.singleton(new Authorizations(methodAuths));
return runningQuery;
}

private void addNDC() {
String user = this.settings.getUserDN();
UUID uuid = this.settings.getId();
if (user != null && uuid != null) {
NDC.push("[" + user + "] [" + uuid + "]");
}
}

private void removeNDC() {
NDC.pop();
}

public void setClient(AccumuloClient client) throws Exception {
// if we are setting this null, we shouldn't try to initialize
// the internal logic
if (client == null) {
this.client = null;
return;
}

try {
addNDC();
applyPrediction(null);
Expand Down Expand Up @@ -190,24 +190,23 @@ public void setClient(AccumuloClient client) throws Exception {
}
}
}

public ResultsPage next() throws Exception {
// update AbstractRunningQuery.lastUsed
touch();
long pageStartTime = System.currentTimeMillis();
List<Object> resultList = new ArrayList<>();

int currentPageCount = 0;
long currentPageBytes = 0;
int maxPageSize = Math.min(this.settings.getPagesize(), this.logic.getMaxPageSize());

try {
addNDC();



// test for any exceptions prior to loop as hasNext() would likely be false;
testForUncaughtException(resultList.size());

while (!this.finished && ((future != null) || this.iter.hasNext())) {
// if we are canceled, then break out
if (this.canceled) {
Expand Down Expand Up @@ -242,8 +241,7 @@ public ResultsPage next() throws Exception {
this.getMetric().setLifecycle(QueryMetric.Lifecycle.MAXRESULTS);
break;
}



if (this.logic.getMaxWork() >= 0 && (this.getMetric().getNextCount() + this.getMetric().getSeekCount()) >= this.logic.getMaxWork()) {
log.info("Query logic max work has been reached, aborting query.next call");
this.getMetric().setLifecycle(QueryMetric.Lifecycle.MAXWORK);
Expand All @@ -254,13 +252,13 @@ public ResultsPage next() throws Exception {
// use the pagestart time for the time in call since we only care about the execution time of
// this page.
long pageTimeInCall = (System.currentTimeMillis() - pageStartTime);

if (timing != null && currentPageCount > 0 && timing.shouldReturnPartialResults(currentPageCount, maxPageSize, pageTimeInCall)) {
log.info("Query logic max expire before page is full, returning existing results " + currentPageCount + " " + maxPageSize + " "
+ pageTimeInCall + " " + timing);
+ pageTimeInCall + " " + timing);
break;
}

Object o = null;
if (executor != null) {
if (future == null) {
Expand All @@ -282,12 +280,12 @@ public ResultsPage next() throws Exception {
} else {
o = iter.next();
}

// regardless whether the transform iterator returned a result, it may have updated the metrics (next/seek calls etc.)
if (iter.getTransformer() instanceof WritesQueryMetrics) {
((WritesQueryMetrics) iter.getTransformer()).writeQueryMetrics(this.getMetric());
}

// if not still waiting on a future, then process the result (or lack thereof)
if (future == null) {
if (null == o) {
Expand All @@ -302,13 +300,13 @@ public ResultsPage next() throws Exception {
currentPageCount++;
numResults++;
}

testForUncaughtException(resultList.size());
}

// if the last hasNext() call failed, then we would catch the exception here
testForUncaughtException(resultList.size());

// Update the metric
long now = System.currentTimeMillis();
this.getMetric().addPageTime(currentPageCount, now - pageStartTime, pageStartTime, now);
Expand All @@ -324,7 +322,7 @@ public ResultsPage next() throws Exception {
// update AbstractRunningQuery.lastUsed in case this operation took a long time
touch();
removeNDC();

if (this.queryMetrics != null) {
try {
this.queryMetrics.updateMetric(this.getMetric());
Expand All @@ -336,6 +334,7 @@ public ResultsPage next() throws Exception {
ResultsPage.Status status = (currentPageCount < maxPageSize) ? ResultsPage.Status.PARTIAL : ResultsPage.Status.COMPLETE;
return new ResultsPage(resultList, status);
}

public void cancel() {
this.canceled = true;
// save off the future as it could be removed at any time
Expand Down

0 comments on commit e7c9771

Please sign in to comment.