diff --git a/src/controllers/threading/threadHandler.ts b/src/controllers/threading/threadHandler.ts index 7f91bb3..37ae209 100644 --- a/src/controllers/threading/threadHandler.ts +++ b/src/controllers/threading/threadHandler.ts @@ -16,7 +16,7 @@ import ErrorHandler from "../../middlewares/error"; import { Request, Response } from "express"; import { BullJob, PiscinaWaitTime, ThreadPool } from "../../types"; import { TaskInfo, InnerTaskData, QueryHandlerOptions } from "@biothings-explorer/types"; -import { DialHome, TrapiQuery, TrapiResponse } from "@biothings-explorer/types"; +import { ThreadMessage, TrapiQuery, TrapiResponse } from "@biothings-explorer/types"; import { Queue } from "bull"; const SYNC_MIN_CONCURRENCY = 2; @@ -98,7 +98,7 @@ if (!global.threadpool && !Piscina.isWorkerThread && !(process.env.USE_THREADING } as ThreadPool; } -async function queueTaskToWorkers(pool: Piscina, taskInfo: TaskInfo, route: string, job?: BullJob): Promise { +async function queueTaskToWorkers(pool: Piscina, taskInfo: TaskInfo, route: string, job?: BullJob): Promise { return new Promise((resolve, reject) => { let workerThreadID: string; const abortController = new AbortController(); @@ -154,34 +154,37 @@ async function queueTaskToWorkers(pool: Piscina, taskInfo: TaskInfo, route: stri } = {}; const timeout = parseInt(process.env.REQUEST_TIMEOUT ?? (60 * 5).toString()) * 1000; - fromWorker.on("message", (msg: DialHome) => { - if (msg.cacheInProgress) { - // Cache handler has started caching - cacheInProgress += 1; - } else if (msg.addCacheKey) { - // Hashed edge id cache in progress - cacheKeys[msg.addCacheKey] = false; - } else if (msg.completeCacheKey) { - // Hashed edge id cache complete - cacheKeys[msg.completeCacheKey] = true; - } else if (msg.registerId) { - // Worker registers itself for better tracking - workerThreadID = String(msg.threadId); - if (job) { - void job.update({ ...job.data, threadId }); - } - } else if (typeof msg.cacheDone !== "undefined") { - cacheInProgress = msg.cacheDone - ? cacheInProgress - 1 // A caching handler has finished caching - : 0; // Caching has been entirely cancelled - } else if (typeof msg.result !== "undefined") { - // Request has finished with a message - reqDone = true; - resolve(msg); - } else if (msg.err) { - // Request has resulted in a catchable error - reqDone = true; - reject(msg.err); + fromWorker.on("message", (msg: ThreadMessage) => { + switch (msg.type) { + default: + debug(`WARNING: received untyped message from thread {msg.threadId}`); + break; + case "result": + reqDone = true; + resolve(msg); + break; + case "error": + reqDone = true; + reject(msg.value as Error); + break; + case "cacheInProgress": + cacheInProgress += 1; + break; + case "addCacheKey": + cacheKeys[msg.value as string] = false; + break; + case "completeCacheKey": + cacheKeys[msg.value as string] = true; + break; + case "registerId": + workerThreadID = String(msg.threadId); + if (job) { + void job.update({ ...job.data, threadId }); + } + break; + case "cacheDone": + cacheInProgress = msg.value ? cacheInProgress - 1 : 0; + break; } if (reqDone && cacheInProgress <= 0 && job) { void job.progress(100); @@ -252,13 +255,13 @@ export async function runTask(req: Request, res: Response, route: string, useBul route, ); - if (typeof response.result !== "undefined") { + if (response.type === 'result') { if (response.status) { res?.status(response.status as number); } - return response.result ? response.result : undefined; // null msg means keep response body empty - } else if (response.err) { - throw response.err; + return response.value ? (response.value as TrapiResponse) : undefined; // null msg means keep response body empty + } else if (response.type === "error") { + throw response.value as Error; } else { throw new Error("Threading Error: Task resolved without message"); } @@ -325,18 +328,18 @@ export async function runBullJob(job: BullJob, route: string, useAsync = true) { route, job, ); - if (typeof response.result !== "undefined") { - return response.result ? response.result : undefined; // null result means keep response body empty - } else if (response.err) { - throw response.err; + if (response.type === "result") { + return response.value ? (response.value as TrapiResponse) : undefined; // null result means keep response body empty + } else if (response.type === "error") { + throw response.value as Error; } else { throw new Error("Threading Error: Task resolved without message"); } } -export function taskResponse(response: T, status: string | number = undefined): T { +export function taskResponse(response: T, status: number = undefined): T { if (global.parentPort) { - global.parentPort.postMessage({ threadId, result: response, status: status }); + global.parentPort.postMessage({ threadId, type: 'result', value: response, status } satisfies ThreadMessage); return undefined; } else { return response; @@ -348,7 +351,7 @@ export function taskError(error: Error): void { if (ErrorHandler.shouldHandleError(error)) { Telemetry.captureException(error); } - global.parentPort.postMessage({ threadId, err: error }); + global.parentPort.postMessage({ threadId, type: 'error', value: error } satisfies ThreadMessage); return undefined; } else { throw error;