Skip to content

Commit

Permalink
new threading messaging type
Browse files Browse the repository at this point in the history
  • Loading branch information
rjawesome committed Jun 26, 2024
1 parent 6b794a9 commit 4b9a9ba
Showing 1 changed file with 44 additions and 41 deletions.
85 changes: 44 additions & 41 deletions src/controllers/threading/threadHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import ErrorHandler from "../../middlewares/error";
import { Request, Response } from "express";
import { BullJob, PiscinaWaitTime, ThreadPool } from "../../types";
import { TaskInfo, InnerTaskData, QueryHandlerOptions } from "@biothings-explorer/types";
import { DialHome, TrapiQuery, TrapiResponse } from "@biothings-explorer/types";
import { ThreadMessage, TrapiQuery, TrapiResponse } from "@biothings-explorer/types";
import { Queue } from "bull";

const SYNC_MIN_CONCURRENCY = 2;
Expand Down Expand Up @@ -98,7 +98,7 @@ if (!global.threadpool && !Piscina.isWorkerThread && !(process.env.USE_THREADING
} as ThreadPool;
}

async function queueTaskToWorkers(pool: Piscina, taskInfo: TaskInfo, route: string, job?: BullJob): Promise<DialHome> {
async function queueTaskToWorkers(pool: Piscina, taskInfo: TaskInfo, route: string, job?: BullJob): Promise<ThreadMessage> {
return new Promise((resolve, reject) => {
let workerThreadID: string;
const abortController = new AbortController();
Expand Down Expand Up @@ -154,34 +154,37 @@ async function queueTaskToWorkers(pool: Piscina, taskInfo: TaskInfo, route: stri
} = {};
const timeout = parseInt(process.env.REQUEST_TIMEOUT ?? (60 * 5).toString()) * 1000;

fromWorker.on("message", (msg: DialHome) => {
if (msg.cacheInProgress) {
// Cache handler has started caching
cacheInProgress += 1;
} else if (msg.addCacheKey) {
// Hashed edge id cache in progress
cacheKeys[msg.addCacheKey] = false;
} else if (msg.completeCacheKey) {
// Hashed edge id cache complete
cacheKeys[msg.completeCacheKey] = true;
} else if (msg.registerId) {
// Worker registers itself for better tracking
workerThreadID = String(msg.threadId);
if (job) {
void job.update({ ...job.data, threadId });
}
} else if (typeof msg.cacheDone !== "undefined") {
cacheInProgress = msg.cacheDone
? cacheInProgress - 1 // A caching handler has finished caching
: 0; // Caching has been entirely cancelled
} else if (typeof msg.result !== "undefined") {
// Request has finished with a message
reqDone = true;
resolve(msg);
} else if (msg.err) {
// Request has resulted in a catchable error
reqDone = true;
reject(msg.err);
fromWorker.on("message", (msg: ThreadMessage) => {
switch (msg.type) {
default:
debug(`WARNING: received untyped message from thread {msg.threadId}`);
break;
case "result":
reqDone = true;
resolve(msg);
break;
case "error":
reqDone = true;
reject(msg.value as Error);
break;
case "cacheInProgress":
cacheInProgress += 1;
break;
case "addCacheKey":
cacheKeys[msg.value as string] = false;
break;
case "completeCacheKey":
cacheKeys[msg.value as string] = true;
break;
case "registerId":
workerThreadID = String(msg.threadId);
if (job) {
void job.update({ ...job.data, threadId });
}
break;
case "cacheDone":
cacheInProgress = msg.value ? cacheInProgress - 1 : 0;
break;
}
if (reqDone && cacheInProgress <= 0 && job) {
void job.progress(100);
Expand Down Expand Up @@ -252,13 +255,13 @@ export async function runTask(req: Request, res: Response, route: string, useBul
route,
);

if (typeof response.result !== "undefined") {
if (response.type === 'result') {
if (response.status) {
res?.status(response.status as number);
}
return response.result ? response.result : undefined; // null msg means keep response body empty
} else if (response.err) {
throw response.err;
return response.value ? (response.value as TrapiResponse) : undefined; // null msg means keep response body empty
} else if (response.type === "error") {
throw response.value as Error;
} else {
throw new Error("Threading Error: Task resolved without message");
}
Expand Down Expand Up @@ -325,18 +328,18 @@ export async function runBullJob(job: BullJob, route: string, useAsync = true) {
route,
job,
);
if (typeof response.result !== "undefined") {
return response.result ? response.result : undefined; // null result means keep response body empty
} else if (response.err) {
throw response.err;
if (response.type === "result") {
return response.value ? (response.value as TrapiResponse) : undefined; // null result means keep response body empty
} else if (response.type === "error") {
throw response.value as Error;
} else {
throw new Error("Threading Error: Task resolved without message");
}
}

export function taskResponse<T>(response: T, status: string | number = undefined): T {
export function taskResponse<T>(response: T, status: number = undefined): T {
if (global.parentPort) {
global.parentPort.postMessage({ threadId, result: response, status: status });
global.parentPort.postMessage({ threadId, type: 'result', value: response, status } satisfies ThreadMessage);
return undefined;
} else {
return response;
Expand All @@ -348,7 +351,7 @@ export function taskError(error: Error): void {
if (ErrorHandler.shouldHandleError(error)) {
Telemetry.captureException(error);
}
global.parentPort.postMessage({ threadId, err: error });
global.parentPort.postMessage({ threadId, type: 'error', value: error } satisfies ThreadMessage);
return undefined;
} else {
throw error;
Expand Down

0 comments on commit 4b9a9ba

Please sign in to comment.