Skip to content

Commit

Permalink
fix: loglevel behavior, async status interactions
Browse files Browse the repository at this point in the history
  • Loading branch information
tokebe committed Dec 14, 2023
1 parent 28e5b58 commit 7f816f0
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 67 deletions.
4 changes: 2 additions & 2 deletions src/controllers/async/asyncquery.js
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ exports.getQueryResponse = async (jobID, logLevel = null) => {
);
const response = Object.fromEntries(values);
if (response.logs && logLevel) {
utils.filterForLogLevel(response, logLevel);
response.logs = utils.filterForLogLevel(response.logs, logLevel);
} else if (response.logs && originalLogLevel) {
utils.filterForLogLevel(response, originalLogLevel);
response.logs = utils.filterForLogLevel(response.logs, originalLogLevel);
}
return response ? response : undefined;
});
Expand Down
125 changes: 65 additions & 60 deletions src/routes/v1/asyncquery_status.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,76 +31,81 @@ class VCheckQueryStatus {
//logger.info("query /query endpoint")
try {
debug(`checking query status of job ${req.params.id}`);
let by = req.data.options.by;
let job_id = req.params.id;
let jobID = req.params.id;
let queryQueue;
if (redisClient.clientEnabled) {
if (job_id.startsWith("BT_")) {
queryQueue = getQueryQueue("bte_query_queue_by_team");
} else if (job_id.startsWith("BA_")) {
queryQueue = getQueryQueue("bte_query_queue_by_api");
} else {
queryQueue = getQueryQueue("bte_query_queue");
}
if (!redisClient.clientEnabled) {
taskResponse({ error: "Redis service is unavailable" }, 503);
}
if (queryQueue) {
let job = await queryQueue.getJobFromId(job_id);

if (job === null) {
return taskResponse(null, 404);
}
await queryQueue.isReady();
const state = await job.getState();
let logs = await queryQueue.getJobLogs(job_id);
logs = logs.logs.map(log => JSON.parse(log));
let [status, description] = {
// convert to TRAPI states
completed: ["Completed", "The query has finished executing."],
failed: ["Failed", job.failedReason],
delayed: ["Queued", "The query is queued, but has been delayed."],
active: ["Running", "The query is currently being processed."],
waiting: ["Queued", "The query is waiting in the queue."],
paused: ["Queued", "The query is queued, but the queue is temporarily paused."],
stuck: ["Failed", "The query is stuck (if you see this, raise an issue)."],
null: ["Failed", "The query status is unknown, presumed failed (if you see this, raise an issue)."],
}[state];
let progress = job._progress;
if (status === "Failed" && !req.endpoint.includes("asyncquery_response")) {
if (description.includes("Promise timed out")) {
// something might break when calculating process.env.JOB_TIMEOUT so wrap it in try catch
try {
return taskResponse({
job_id,
status,
description: `Job was stopped after exceeding time limit of ${
parseInt(process.env.JOB_TIMEOUT ?? (1000 * 60 * 5).toString()) / 1000
}s`,
logs,
});
} catch (e) {
return taskResponse({ job_id, status, description, logs });
}
}
return taskResponse({ job_id, status, description, logs });
}
if (jobID.startsWith("BT_")) {
queryQueue = getQueryQueue("bte_query_queue_by_team");
} else if (jobID.startsWith("BA_")) {
queryQueue = getQueryQueue("bte_query_queue_by_api");
} else {
queryQueue = getQueryQueue("bte_query_queue");
}

let job = await queryQueue.getJobFromId(jobID);
if (job === null) {
return taskResponse(null, 404);
}

// If done, just give response if using the response_url
if ((state === "completed" || state === "failed") && req.endpoint.includes("asyncquery_response")) {
let returnValue;
const storedResponse = await getQueryResponse(job_id, req.data.options.logLevel);
await queryQueue.isReady();

if (!storedResponse.logs && logs) {
storedResponse.logs = logs;
const state = await job.getState();
let progress = job._progress;

let logs = await queryQueue.getJobLogs(jobID);
logs = logs.logs.map(log => JSON.parse(log));
const originalLogLevel = JSON.parse(await redisClient.client.getTimeout(`asyncQueryResult:logLevel:${jobID}`));
logs = utils.filterForLogLevel(logs, req.data.options.log_level ?? originalLogLevel);

// convert to TRAPI states
let [status, description] = {
completed: ["Completed", "The query has finished executing."],
failed: ["Failed", job.failedReason],
delayed: ["Queued", "The query is queued, but has been delayed."],
active: ["Running", "The query is currently being processed."],
waiting: ["Queued", "The query is waiting in the queue."],
paused: ["Queued", "The query is queued, but the queue is temporarily paused."],
stuck: ["Failed", "The query is stuck (if you see this, raise an issue)."],
null: ["Failed", "The query status is unknown, presumed failed (if you see this, raise an issue)."],
}[state];

if (status === "Failed" && !req.endpoint.includes("asyncquery_response")) {
if (description.includes("Promise timed out")) {
// something might break when calculating process.env.JOB_TIMEOUT so wrap it in try catch
try {
return taskResponse({
job_id: jobID,
status,
description: `Job was stopped after exceeding time limit of ${
parseInt(process.env.JOB_TIMEOUT ?? (1000 * 60 * 5).toString()) / 1000
}s`,
logs,
});
} catch (e) {
return taskResponse({ job_id: jobID, status, description, logs });
}
}
return taskResponse({ job_id: jobID, status, description, logs });
}

returnValue = storedResponse ? storedResponse : { error: "Response expired. Responses are kept 30 days." };
return taskResponse(returnValue, returnValue.statusCode || 200);
// If done, just give response if using asyncquery_response
if ((state === "completed" || state === "failed") && req.endpoint.includes("asyncquery_response")) {
let returnValue;
const storedResponse = await getQueryResponse(jobID, req.data.options.log_level);

if (storedResponse && !storedResponse.logs && logs) {
storedResponse.logs = logs;
}

taskResponse({ job_id, status, progress, description, response_url: job.data.url, logs }, 200);
} else {
taskResponse({ error: "Redis service is unavailable" }, 503);
returnValue = storedResponse ? storedResponse : { error: "Response expired. Responses are kept 30 days." };
return taskResponse(returnValue, returnValue.statusCode || 200);
}

// Otherwise respond for asyncquery_status
taskResponse({ job_id: jobID, status, progress, description, response_url: job.data.url, logs }, 200);
} catch (error) {
taskError(error);
}
Expand Down
2 changes: 1 addition & 1 deletion src/routes/v1/query_v1.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class V1RouteQuery {
await handler.query();

const response = handler.getResponse();
utils.filterForLogLevel(response, options.logLevel);
response.logs = utils.filterForLogLevel(response.logs, options.logLevel);
return taskResponse(response);
} catch (error) {
return taskError(error);
Expand Down
2 changes: 1 addition & 1 deletion src/routes/v1/query_v1_by_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class RouteQueryV1ByAPI {
handler.setQueryGraph(queryGraph);
await handler.query();
const response = handler.getResponse();
utils.filterForLogLevel(response, options.logLevel);
response.logs = utils.filterForLogLevel(response.logs, options.logLevel);
return taskResponse(response);
} catch (error) {
return taskError(error);
Expand Down
2 changes: 1 addition & 1 deletion src/routes/v1/query_v1_by_team.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class RouteQueryV1ByTeam {
handler.setQueryGraph(queryGraph);
await handler.query();
const response = handler.getResponse();
utils.filterForLogLevel(response, options.logLevel);
response.logs = utils.filterForLogLevel(response.logs, options.logLevel);
return taskResponse(response);
} catch (error) {
return taskError(error);
Expand Down
5 changes: 3 additions & 2 deletions src/utils/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,19 @@ exports.stringIsAValidUrl = s => {
}
};

exports.filterForLogLevel = (response, logLevel) => {
exports.filterForLogLevel = (logs, logLevel) => {
const logLevels = {
ERROR: 3,
WARNING: 2,
INFO: 1,
DEBUG: 0,
};
if (logLevel && Object.keys(logLevels).includes(logLevel)) {
response.logs = response.logs.filter(log => {
logs = logs.filter(log => {
return logLevels[log.level] >= logLevels[logLevel];
});
}
return logs;
};

exports.methodNotAllowed = (req, res, next) => res.status(405).send();

0 comments on commit 7f816f0

Please sign in to comment.