Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(instrumentation): Updated Langchain instrumentation to use Langchain Callbacks #902

Closed
wants to merge 7 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,21 @@
from typing import Collection
from opentelemetry.instrumentation.langchain.config import Config
from wrapt import wrap_function_wrapper
from importlib.metadata import version as package_version, PackageNotFoundError

from opentelemetry.trace import get_tracer

from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.instrumentation.langchain.utils import _with_tracer_wrapper

from opentelemetry.instrumentation.langchain.version import __version__
from opentelemetry import context as context_api
from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY

from opentelemetry.semconv.ai import TraceloopSpanKindValues

from opentelemetry.instrumentation.langchain.init_wrapper import init_wrapper

from opentelemetry.instrumentation.langchain.task_wrapper import (
task_wrapper,
Expand All @@ -26,134 +36,108 @@
chat_wrapper,
achat_wrapper,
)
from opentelemetry.instrumentation.langchain.version import __version__

from opentelemetry.semconv.ai import TraceloopSpanKindValues

logger = logging.getLogger(__name__)

_instruments = ("langchain >= 0.0.346", "langchain-core > 0.1.0")

WRAPPED_METHODS = [
TO_INSTRUMENT = [
{
"package": "langchain.chains.base",
"object": "Chain",
"method": "__call__",
"wrapper": task_wrapper,
},
{
"package": "langchain.chains.base",
"object": "Chain",
"method": "acall",
"wrapper": atask_wrapper,
},
{
"package": "langchain.chains",
"object": "SequentialChain",
"method": "__call__",
"span_name": "langchain.workflow",
"wrapper": workflow_wrapper,
},
{
"package": "langchain.chains",
"object": "SequentialChain",
"method": "acall",
"span_name": "langchain.workflow",
"wrapper": aworkflow_wrapper,
"class": "Chain",
"callback_supported": True,
},
{
"package": "langchain.agents",
"object": "AgentExecutor",
"method": "_call",
"span_name": "langchain.agent",
"kind": TraceloopSpanKindValues.AGENT.value,
"wrapper": workflow_wrapper,
"class": "Agent",
"callback_supported": True,
},
{
"package": "langchain.tools",
"object": "Tool",
"method": "_run",
"class": "Tool",
"span_name": "langchain.tool",
"kind": TraceloopSpanKindValues.TOOL.value,
"wrapper": task_wrapper,
},
{
"package": "langchain.chains",
"object": "RetrievalQA",
"method": "__call__",
"span_name": "retrieval_qa.workflow",
"wrapper": workflow_wrapper,
"callback_supported": True,
},
{
"package": "langchain.chains",
"object": "RetrievalQA",
"method": "acall",
"class": "RetrievalQA",
"span_name": "retrieval_qa.workflow",
"wrapper": aworkflow_wrapper,
"callback_supported": True,
},
{
"package": "langchain.prompts.base",
"object": "BasePromptTemplate",
"method": "invoke",
"wrapper": task_wrapper,
"callback_supported": False,
},
{
"package": "langchain.prompts.base",
"object": "BasePromptTemplate",
"method": "ainvoke",
"wrapper": atask_wrapper,
"callback_supported": False,
},
{
"package": "langchain.chat_models.base",
"object": "BaseChatModel",
"method": "generate",
"wrapper": chat_wrapper,
"callback_supported": False,
},
{
"package": "langchain.chat_models.base",
"object": "BaseChatModel",
"method": "agenerate",
"wrapper": achat_wrapper,
"callback_supported": False,
},
{
"package": "langchain.schema",
"object": "BaseOutputParser",
"method": "invoke",
"wrapper": task_wrapper,
"callback_supported": False,
},
{
"package": "langchain.schema",
"object": "BaseOutputParser",
"method": "ainvoke",
"wrapper": atask_wrapper,
"callback_supported": False,
},
{
"package": "langchain.schema.runnable",
"object": "RunnableSequence",
"method": "invoke",
"span_name": "langchain.workflow",
"wrapper": workflow_wrapper,
"callback_supported": False,
},
{
"package": "langchain.schema.runnable",
"object": "RunnableSequence",
"method": "ainvoke",
"span_name": "langchain.workflow",
"wrapper": aworkflow_wrapper,
"callback_supported": False,
},
{
"package": "langchain_core.language_models.llms",
"object": "LLM",
"method": "_generate",
"span_name": "llm.generate",
"wrapper": llm_wrapper,
"callback_supported": False,
},
{
"package": "langchain_core.language_models.llms",
"object": "LLM",
"method": "_agenerate",
"span_name": "llm.generate",
"wrapper": allm_wrapper,
"callback_supported": False,
},
]

Expand All @@ -171,23 +155,39 @@ def instrumentation_dependencies(self) -> Collection[str]:
def _instrument(self, **kwargs):
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, __version__, tracer_provider)
for wrapped_method in WRAPPED_METHODS:
wrap_package = wrapped_method.get("package")
wrap_object = wrapped_method.get("object")
wrap_method = wrapped_method.get("method")
wrapper = wrapped_method.get("wrapper")
wrap_function_wrapper(
wrap_package,
f"{wrap_object}.{wrap_method}" if wrap_object else wrap_method,
wrapper(tracer, wrapped_method),
)
for module in TO_INSTRUMENT:
try:
wrap_package = module.get("package")
if module.get("callback_supported"):
wrap_class = module.get("class")
wrap_function_wrapper(
wrap_package,
f"{wrap_class}.__init__",
init_wrapper(tracer, module),
)
else:
wrap_object = module.get("object")
wrap_method = module.get("method")
wrapper = module.get("wrapper")
wrap_function_wrapper(
wrap_package,
f"{wrap_object}.{wrap_method}" if wrap_object else wrap_method,
wrapper(tracer, module),
)

except PackageNotFoundError:
pass

def _uninstrument(self, **kwargs):
for wrapped_method in WRAPPED_METHODS:
wrap_package = wrapped_method.get("package")
wrap_object = wrapped_method.get("object")
wrap_method = wrapped_method.get("method")
unwrap(
f"{wrap_package}.{wrap_object}" if wrap_object else wrap_package,
wrap_method,
)
for module in TO_INSTRUMENT:
if not module.get("callback_supported"):
wrap_class = module.get("class")
unwrap(wrap_package, f"{wrap_class}.__init__")
else:
wrap_package = module.get("package")
wrap_object = module.get("object")
wrap_method = module.get("method")
unwrap(
f"{wrap_package}.{wrap_object}" if wrap_object else wrap_package,
wrap_method,
)