Skip to content

Commit

Permalink
Re:NationalSecurityAgency#897 - Removed trigger booleans, is page les…
Browse files Browse the repository at this point in the history
…s than requested page size, update status
  • Loading branch information
plainolneesh committed Jun 2, 2021
1 parent cf8025d commit 76f0d0f
Showing 1 changed file with 33 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@
*
*/
public class RunningQuery extends AbstractRunningQuery implements Runnable {

private static final long serialVersionUID = 1L;

private static Logger log = Logger.getLogger(RunningQuery.class);

private transient AccumuloClient client = null;
private AccumuloConnectionFactory.Priority connectionPriority = null;
private transient QueryLogic<?> logic = null;
Expand All @@ -67,32 +67,32 @@ public class RunningQuery extends AbstractRunningQuery implements Runnable {
private volatile Future<Object> future = null;
private QueryPredictor predictor = null;
private long maxResults = 0;

public RunningQuery() {
super(new QueryMetricFactoryImpl());
}

public RunningQuery(AccumuloClient client, AccumuloConnectionFactory.Priority priority, QueryLogic<?> logic, Query settings, String methodAuths,
Principal principal, QueryMetricFactory metricFactory) throws Exception {
this(null, client, priority, logic, settings, methodAuths, principal, null, null, metricFactory);
}

public RunningQuery(AccumuloClient client, AccumuloConnectionFactory.Priority priority, QueryLogic<?> logic, Query settings, String methodAuths,
Principal principal, RunningQueryTiming timing, ExecutorService executor, QueryMetricFactory metricFactory) throws Exception {
this(null, client, priority, logic, settings, methodAuths, principal, timing, executor, metricFactory);
}

public RunningQuery(QueryMetricsBean queryMetrics, AccumuloClient client, AccumuloConnectionFactory.Priority priority, QueryLogic<?> logic, Query settings,
String methodAuths, Principal principal, QueryMetricFactory metricFactory) throws Exception {
this(queryMetrics, client, priority, logic, settings, methodAuths, principal, null, null, metricFactory);
}

public RunningQuery(QueryMetricsBean queryMetrics, AccumuloClient client, AccumuloConnectionFactory.Priority priority, QueryLogic<?> logic, Query settings,
String methodAuths, Principal principal, RunningQueryTiming timing, ExecutorService executor, QueryMetricFactory metricFactory)
throws Exception {
this(queryMetrics, client, priority, logic, settings, methodAuths, principal, timing, executor, null, metricFactory);
}

public RunningQuery(QueryMetricsBean queryMetrics, AccumuloClient client, AccumuloConnectionFactory.Priority priority, QueryLogic<?> logic, Query settings,
String methodAuths, Principal principal, RunningQueryTiming timing, ExecutorService executor, QueryPredictor predictor,
QueryMetricFactory metricFactory) throws Exception {
Expand Down Expand Up @@ -128,7 +128,7 @@ public RunningQuery(QueryMetricsBean queryMetrics, AccumuloClient client, Accumu
+ " has a DN configured with a different limit");
}
}

public static RunningQuery createQueryWithAuthorizations(QueryMetricsBean queryMetrics, AccumuloClient client, AccumuloConnectionFactory.Priority priority,
QueryLogic<?> logic, Query settings, String methodAuths, RunningQueryTiming timing, ExecutorService executor, QueryPredictor predictor,
QueryMetricFactory metricFactory) throws Exception {
Expand All @@ -137,27 +137,27 @@ public static RunningQuery createQueryWithAuthorizations(QueryMetricsBean queryM
runningQuery.calculatedAuths = Collections.singleton(new Authorizations(methodAuths));
return runningQuery;
}

private void addNDC() {
String user = this.settings.getUserDN();
UUID uuid = this.settings.getId();
if (user != null && uuid != null) {
NDC.push("[" + user + "] [" + uuid + "]");
}
}

private void removeNDC() {
NDC.pop();
}

public void setClient(AccumuloClient client) throws Exception {
// if we are setting this null, we shouldn't try to initialize
// the internal logic
if (client == null) {
this.client = null;
return;
}

try {
addNDC();
applyPrediction(null);
Expand Down Expand Up @@ -190,22 +190,20 @@ public void setClient(AccumuloClient client) throws Exception {
}
}
}

public ResultsPage next() throws Exception {
// update AbstractRunningQuery.lastUsed
touch();
long pageStartTime = System.currentTimeMillis();
List<Object> resultList = new ArrayList<>();
boolean hitPageByteTrigger = false;
boolean hitPageTimeTrigger = false;
try {
addNDC();
int currentPageCount = 0;
long currentPageBytes = 0;

// test for any exceptions prior to loop as hasNext() would likely be false;
testForUncaughtException(resultList.size());

while (!this.finished && ((future != null) || this.iter.hasNext())) {
// if we are canceled, then break out
if (this.canceled) {
Expand All @@ -226,7 +224,6 @@ public ResultsPage next() throws Exception {
// if the logic had a page byte trigger and we have readed that, then break out
if (this.logic.getPageByteTrigger() > 0 && currentPageBytes >= this.logic.getPageByteTrigger()) {
log.info("Query logic max page byte trigger has been reached, aborting query.next call");
hitPageByteTrigger = true;
break;
}
// if the logic had a max num results (across all pages) and we have reached that (or the maxResultsOverride if set), then break out
Expand All @@ -240,7 +237,13 @@ public ResultsPage next() throws Exception {
log.info("Query logic max results has been reached, aborting query.next call");
this.getMetric().setLifecycle(QueryMetric.Lifecycle.MAXRESULTS);
break;

} else if (numResults >= this.maxResults) {
log.info("Results are greater than max results, aborting query.next call");
this.getMetric().setLifecycle(BaseQueryMetric.Lifecycle.MAXRESULTS);
break;
}

if (this.logic.getMaxWork() >= 0 && (this.getMetric().getNextCount() + this.getMetric().getSeekCount()) >= this.logic.getMaxWork()) {
log.info("Query logic max work has been reached, aborting query.next call");
this.getMetric().setLifecycle(QueryMetric.Lifecycle.MAXWORK);
Expand All @@ -251,15 +254,14 @@ public ResultsPage next() throws Exception {
// use the pagestart time for the time in call since we only care about the execution time of
// this page.
long pageTimeInCall = (System.currentTimeMillis() - pageStartTime);

int maxPageSize = Math.min(this.settings.getPagesize(), this.logic.getMaxPageSize());
if (timing != null && currentPageCount > 0 && timing.shouldReturnPartialResults(currentPageCount, maxPageSize, pageTimeInCall)) {
log.info("Query logic max expire before page is full, returning existing results " + currentPageCount + " " + maxPageSize + " "
+ pageTimeInCall + " " + timing);
hitPageTimeTrigger = true;
break;
}

Object o = null;
if (executor != null) {
if (future == null) {
Expand All @@ -281,12 +283,12 @@ public ResultsPage next() throws Exception {
} else {
o = iter.next();
}

// regardless whether the transform iterator returned a result, it may have updated the metrics (next/seek calls etc.)
if (iter.getTransformer() instanceof WritesQueryMetrics) {
((WritesQueryMetrics) iter.getTransformer()).writeQueryMetrics(this.getMetric());
}

// if not still waiting on a future, then process the result (or lack thereof)
if (future == null) {
if (null == o) {
Expand All @@ -301,13 +303,13 @@ public ResultsPage next() throws Exception {
currentPageCount++;
numResults++;
}

testForUncaughtException(resultList.size());
}

// if the last hasNext() call failed, then we would catch the exception here
testForUncaughtException(resultList.size());

// Update the metric
long now = System.currentTimeMillis();
this.getMetric().addPageTime(currentPageCount, now - pageStartTime, pageStartTime, now);
Expand All @@ -323,7 +325,7 @@ public ResultsPage next() throws Exception {
// update AbstractRunningQuery.lastUsed in case this operation took a long time
touch();
removeNDC();

if (this.queryMetrics != null) {
try {
this.queryMetrics.updateMetric(this.getMetric());
Expand All @@ -335,10 +337,10 @@ public ResultsPage next() throws Exception {
if (resultList.isEmpty()) {
return new ResultsPage();
} else {
return new ResultsPage(resultList, ((hitPageByteTrigger || hitPageTimeTrigger) ? ResultsPage.Status.PARTIAL : ResultsPage.Status.COMPLETE));
return new ResultsPage(resultList,ResultsPage.Status.PARTIAL);
}
}

public void cancel() {
this.canceled = true;
// save off the future as it could be removed at any time
Expand Down

0 comments on commit 76f0d0f

Please sign in to comment.