From 06c5437cdb064005a7acd48b993c44b6572aa147 Mon Sep 17 00:00:00 2001 From: Rohan Juneja Date: Fri, 19 Jan 2024 19:26:14 -0800 Subject: [PATCH] Set up child span between threads --- src/controllers/threading/taskHandler.js | 4 ++-- src/controllers/threading/threadHandler.js | 10 +++++++++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/controllers/threading/taskHandler.js b/src/controllers/threading/taskHandler.js index f056ad5..bc488de 100644 --- a/src/controllers/threading/taskHandler.js +++ b/src/controllers/threading/taskHandler.js @@ -44,7 +44,7 @@ try { debug(error); } -const runTask = async ({ req, route, port, job: { jobId, queueName } = {} }) => { +const runTask = async ({ req, route, traceparent, tracestate, port, job: { jobId, queueName } = {} }) => { debug(`Worker thread ${threadId} beginning ${workerData.queue} task.`); global.SCHEMA_VERSION = "1.4.0"; @@ -81,7 +81,7 @@ const runTask = async ({ req, route, port, job: { jobId, queueName } = {} }) => scope.setSpan(transaction); }); - span = opentelemetry.trace.getTracer('biothings-explorer-thread').startSpan(routeNames[route]) + span = opentelemetry.trace.getTracer('biothings-explorer-thread').startSpan(routeNames[route], undefined, opentelemetry.propagation.extract(opentelemetry.context.active(), {traceparent, tracestate})) span.setAttribute("bte.requestData", JSON.stringify(req.data.queryGraph)); Telemetry.setOtelSpan(span); } catch (error) { diff --git a/src/controllers/threading/threadHandler.js b/src/controllers/threading/threadHandler.js index 7791b05..bc67bc7 100644 --- a/src/controllers/threading/threadHandler.js +++ b/src/controllers/threading/threadHandler.js @@ -1,4 +1,5 @@ const { MessageChannel, threadId } = require("worker_threads"); +const { context, propagation, trace } = require("@opentelemetry/api"); const debug = require("debug")("bte:biothings-explorer-trapi:threading"); const path = require("path"); // const taskHandler = require("./taskHandler"); @@ -97,7 +98,14 @@ const queueTaskToWorkers = async (pool, req, route, job) => { let WorkerThreadID; const abortController = new AbortController(); const { port1: toWorker, port2: fromWorker } = new MessageChannel(); - const taskData = { req, route, port: toWorker }; + + // get otel context + const otelData = {}; + propagation.inject(context.active(), otelData); + const { traceparent, tracestate } = otelData; + + + const taskData = { req, route, traceparent, tracestate, port: toWorker }; if (job) taskData.job = { jobId: job.id, queueName: job.queue.name }; const task = pool.run(taskData, { signal: abortController.signal, transferList: [toWorker] }); if (job) {