Skip to content

Commit

Permalink
core: tracer: remove numeric execution order (#21220)
Browse files Browse the repository at this point in the history
- this hasn't been used in a long time and requires some additional
bookkeeping i'm going to streamline in the next pr
  • Loading branch information
nfcampos committed May 2, 2024
1 parent 6ac6158 commit 47ce8d5
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 124 deletions.
4 changes: 4 additions & 0 deletions libs/core/langchain_core/callbacks/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2010,6 +2010,10 @@ def _configure(
if run_tree is not None:
for handler in callback_manager.handlers:
if isinstance(handler, LangChainTracer):
handler.order_map[run_tree.id] = (
run_tree.trace_id,
run_tree.dotted_order,
)
handler.run_map[str(run_tree.id)] = cast(Run, run_tree)
for var, inheritable, handler_class, env_var in _configure_hooks:
create_one = (
Expand Down
84 changes: 17 additions & 67 deletions libs/core/langchain_core/tracers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
)
Expand Down Expand Up @@ -68,6 +69,9 @@ def __init__(
super().__init__(**kwargs)
self._schema_format = _schema_format # For internal use only API will change.
self.run_map: Dict[str, Run] = {}
"""Map of run ID to run. Cleared on run end."""
self.order_map: Dict[UUID, Tuple[UUID, str]] = {}
"""Map of run ID to (trace_id, dotted_order). Cleared when tracer GCed."""

@staticmethod
def _add_child_run(
Expand Down Expand Up @@ -100,67 +104,33 @@ def _start_trace(self, run: Run) -> None:
"""Start a trace for a run."""
current_dotted_order = run.start_time.strftime("%Y%m%dT%H%M%S%fZ") + str(run.id)
if run.parent_run_id:
parent_run = self.run_map.get(str(run.parent_run_id))
if parent_run:
self._add_child_run(parent_run, run)
if hasattr(parent_run, "child_execution_order"):
parent_run.child_execution_order = max(
parent_run.child_execution_order, run.child_execution_order
)
run.trace_id = parent_run.trace_id
if parent_run.dotted_order:
run.dotted_order = (
parent_run.dotted_order + "." + current_dotted_order
)
else:
# Something wrong with tracer parent run has no dotted_order
logger.debug(
f"Parent run with UUID {run.parent_run_id} has no dotted_order."
)
if parent := self.order_map.get(run.parent_run_id):
run.trace_id, run.dotted_order = parent
run.dotted_order += "." + current_dotted_order
if parent_run := self.run_map.get(str(run.parent_run_id)):
self._add_child_run(parent_run, run)
else:
# Something wrong with tracer, parent run not found
# Calculate the trace_id and dotted_order server side
logger.debug(f"Parent run with UUID {run.parent_run_id} not found.")
logger.warning(
f"Parent run {run.parent_run_id} not found for run {run.id}."
" Treating as a root run."
)
run.parent_run_id = None
run.trace_id = run.id
run.dotted_order = current_dotted_order
else:
run.trace_id = run.id
run.dotted_order = current_dotted_order
self.order_map[run.id] = (run.trace_id, run.dotted_order)
self.run_map[str(run.id)] = run
self._on_run_create(run)

def _end_trace(self, run: Run) -> None:
"""End a trace for a run."""
if not run.parent_run_id:
self._persist_run(run)
else:
parent_run = self.run_map.get(str(run.parent_run_id))
if parent_run is None:
logger.debug(f"Parent run with UUID {run.parent_run_id} not found.")
elif (
run.child_execution_order is not None
and getattr(parent_run, "child_execution_order", None) is not None
and run.child_execution_order > parent_run.child_execution_order
):
parent_run.child_execution_order = run.child_execution_order
self.run_map.pop(str(run.id))
self._on_run_update(run)

def _get_execution_order(self, parent_run_id: Optional[str] = None) -> int:
"""Get the execution order for a run."""
if parent_run_id is None:
return 1

parent_run = self.run_map.get(parent_run_id)
if parent_run is None:
logger.debug(f"Parent run with UUID {parent_run_id} not found.")
return 1
if getattr(parent_run, "child_execution_order", None) is None:
logger.debug(
f"Parent run with UUID {parent_run_id} has no child_execution_order."
)
return 1

return parent_run.child_execution_order + 1

def _get_run(
self, run_id: UUID, run_type: Union[str, Set[str], None] = None
) -> Run:
Expand Down Expand Up @@ -205,8 +175,6 @@ def on_chat_model_start(
f"Chat model tracing is not supported in "
f"for {self._schema_format} format."
)
parent_run_id_ = str(parent_run_id) if parent_run_id else None
execution_order = self._get_execution_order(parent_run_id_)
start_time = datetime.now(timezone.utc)
if metadata:
kwargs.update({"metadata": metadata})
Expand All @@ -218,8 +186,6 @@ def on_chat_model_start(
extra=kwargs,
events=[{"name": "start", "time": start_time}],
start_time=start_time,
execution_order=execution_order,
child_execution_order=execution_order,
# WARNING: This is valid ONLY for streaming_events.
# run_type="llm" is what's used by virtually all tracers.
# Changing this to "chat_model" may break triggering on_llm_start
Expand All @@ -244,8 +210,6 @@ def on_llm_start(
**kwargs: Any,
) -> Run:
"""Start a trace for an LLM run."""
parent_run_id_ = str(parent_run_id) if parent_run_id else None
execution_order = self._get_execution_order(parent_run_id_)
start_time = datetime.now(timezone.utc)
if metadata:
kwargs.update({"metadata": metadata})
Expand All @@ -258,8 +222,6 @@ def on_llm_start(
extra=kwargs,
events=[{"name": "start", "time": start_time}],
start_time=start_time,
execution_order=execution_order,
child_execution_order=execution_order,
run_type="llm",
tags=tags or [],
name=name, # type: ignore[arg-type]
Expand Down Expand Up @@ -376,8 +338,6 @@ def on_chain_start(
**kwargs: Any,
) -> Run:
"""Start a trace for a chain run."""
parent_run_id_ = str(parent_run_id) if parent_run_id else None
execution_order = self._get_execution_order(parent_run_id_)
start_time = datetime.now(timezone.utc)
if metadata:
kwargs.update({"metadata": metadata})
Expand All @@ -389,8 +349,6 @@ def on_chain_start(
extra=kwargs,
events=[{"name": "start", "time": start_time}],
start_time=start_time,
execution_order=execution_order,
child_execution_order=execution_order,
child_runs=[],
run_type=run_type or "chain",
name=name, # type: ignore[arg-type]
Expand Down Expand Up @@ -474,8 +432,6 @@ def on_tool_start(
**kwargs: Any,
) -> Run:
"""Start a trace for a tool run."""
parent_run_id_ = str(parent_run_id) if parent_run_id else None
execution_order = self._get_execution_order(parent_run_id_)
start_time = datetime.now(timezone.utc)
if metadata:
kwargs.update({"metadata": metadata})
Expand All @@ -496,8 +452,6 @@ def on_tool_start(
extra=kwargs,
events=[{"name": "start", "time": start_time}],
start_time=start_time,
execution_order=execution_order,
child_execution_order=execution_order,
child_runs=[],
run_type="tool",
tags=tags or [],
Expand Down Expand Up @@ -546,8 +500,6 @@ def on_retriever_start(
**kwargs: Any,
) -> Run:
"""Run when Retriever starts running."""
parent_run_id_ = str(parent_run_id) if parent_run_id else None
execution_order = self._get_execution_order(parent_run_id_)
start_time = datetime.now(timezone.utc)
if metadata:
kwargs.update({"metadata": metadata})
Expand All @@ -560,8 +512,6 @@ def on_retriever_start(
extra=kwargs,
events=[{"name": "start", "time": start_time}],
start_time=start_time,
execution_order=execution_order,
child_execution_order=execution_order,
tags=tags,
child_runs=[],
run_type="retriever",
Expand Down
4 changes: 0 additions & 4 deletions libs/core/langchain_core/tracers/langchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ def on_chat_model_start(
**kwargs: Any,
) -> Run:
"""Start a trace for an LLM run."""
parent_run_id_ = str(parent_run_id) if parent_run_id else None
execution_order = self._get_execution_order(parent_run_id_)
start_time = datetime.now(timezone.utc)
if metadata:
kwargs.update({"metadata": metadata})
Expand All @@ -118,8 +116,6 @@ def on_chat_model_start(
extra=kwargs,
events=[{"name": "start", "time": start_time}],
start_time=start_time,
execution_order=execution_order,
child_execution_order=execution_order,
run_type="llm",
tags=tags,
name=name, # type: ignore[arg-type]
Expand Down
2 changes: 0 additions & 2 deletions libs/core/langchain_core/tracers/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ class ToolRun(BaseRun):
class Run(BaseRunV2):
"""Run schema for the V2 API in the Tracer."""

execution_order: int
child_execution_order: int
child_runs: List[Run] = Field(default_factory=list)
tags: Optional[List[str]] = Field(default_factory=list)
events: List[Dict[str, Any]] = Field(default_factory=list)
Expand Down
4 changes: 2 additions & 2 deletions libs/core/langchain_core/tracers/stdout.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ def get_parents(self, run: Run) -> List[Run]:
def get_breadcrumbs(self, run: Run) -> str:
parents = self.get_parents(run)[::-1]
string = " > ".join(
f"{parent.execution_order}:{parent.run_type}:{parent.name}"
f"{parent.run_type}:{parent.name}"
if i != len(parents) - 1
else f"{parent.execution_order}:{parent.run_type}:{parent.name}"
else f"{parent.run_type}:{parent.name}"
for i, parent in enumerate(parents + [run])
)
return string
Expand Down

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions libs/core/tests/unit_tests/runnables/test_runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,6 @@ def _copy_run(self, run: Run) -> Run:
self.uuids_map[run.parent_run_id] if run.parent_run_id else None
),
"child_runs": [self._copy_run(child) for child in run.child_runs],
"execution_order": None,
"child_execution_order": None,
"trace_id": self._replace_uuid(run.trace_id) if run.trace_id else None,
"dotted_order": new_dotted_order,
"inputs": (
Expand Down

0 comments on commit 47ce8d5

Please sign in to comment.