Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly pass config from async local storage into conditional edges #223

Merged
merged 4 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 65 additions & 1 deletion langgraph/src/tests/tracing.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ it("should pass config through if importing from the primary entrypoint", async
return { messages: [res] };
})
.addEdge(START, "testnode")
.addEdge("testnode", END)
.addConditionalEdges("testnode", async (_state) => {
const model = new FakeToolCallingChatModel({
responses: [new AIMessage("hey!")],
}).withConfig({ runName: "conditional_edge_call" });
await model.invoke("testing but should be traced");
return END;
})
.compile();

const eventStream = graph.streamEvents({ messages: [] }, { version: "v2" });
Expand Down Expand Up @@ -161,6 +167,64 @@ it("should pass config through if importing from the primary entrypoint", async
tags: ["seq:step:2", "langsmith:hidden"],
metadata: {},
},
{
event: "on_chain_start",
data: {
input: {
input: undefined,
},
},
name: "func",
tags: ["seq:step:3"],
run_id: expect.any(String),
metadata: {},
},
{
event: "on_chat_model_start",
data: {
input: {
messages: [[new HumanMessage("testing but should be traced")]],
},
},
name: "conditional_edge_call",
tags: [],
run_id: expect.any(String),
metadata: {
ls_model_type: "chat",
ls_stop: undefined,
},
},
{
event: "on_chat_model_end",
data: {
output: new AIMessage("hey!"),
input: {
messages: [[new HumanMessage("testing but should be traced")]],
},
},
run_id: expect.any(String),
name: "conditional_edge_call",
tags: [],
metadata: {
ls_model_type: "chat",
ls_stop: undefined,
},
},
{
event: "on_chain_end",
data: {
output: {
output: undefined,
},
input: {
input: undefined,
},
},
run_id: expect.any(String),
name: "func",
tags: ["seq:step:3"],
metadata: {},
},
{
event: "on_chain_end",
data: {
Expand Down
35 changes: 28 additions & 7 deletions langgraph/src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { CallbackManagerForChainRun } from "@langchain/core/callbacks/manager";
import {
mergeConfigs,
patchConfig,
Runnable,
RunnableConfig,
} from "@langchain/core/runnables";
import { AsyncLocalStorageProviderSingleton } from "@langchain/core/singletons";

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export interface RunnableCallableArgs extends Partial<any> {
Expand Down Expand Up @@ -37,31 +40,49 @@ export class RunnableCallable<I = unknown, O = unknown> extends Runnable<I, O> {
this.recurse = fields.recurse ?? this.recurse;
}

protected async _tracedInvoke(
input: I,
config?: Partial<RunnableConfig>,
runManager?: CallbackManagerForChainRun
) {
return new Promise<O>((resolve, reject) => {
const childConfig = patchConfig(config, {
callbacks: runManager?.getChild(),
});
void AsyncLocalStorageProviderSingleton.getInstance().run(
childConfig,
async () => {
try {
const output = await this.func(input, childConfig);
resolve(output);
} catch (e) {
reject(e);
}
}
);
});
}

async invoke(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
input: any,
options?: Partial<RunnableConfig> | undefined
// eslint-disable-next-line @typescript-eslint/no-explicit-any
): Promise<any> {
if (this.func === undefined) {
return this.invoke(input, options);
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
let returnValue: any;

if (this.trace) {
returnValue = await this._callWithConfig(
this.func,
this._tracedInvoke,
input,
mergeConfigs(this.config, options)
);
} else {
returnValue = await this.func(input, mergeConfigs(this.config, options));
}

// eslint-disable-next-line no-instanceof/no-instanceof
if (returnValue instanceof Runnable && this.recurse) {
if (Runnable.isRunnable(returnValue) && this.recurse) {
return await returnValue.invoke(input, options);
}

Expand Down
Loading