Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to stream token of agent response in agent supervisor? #137

Open
3 of 4 tasks
chatgptguru opened this issue Feb 22, 2024 · 14 comments
Open
3 of 4 tasks

How to stream token of agent response in agent supervisor? #137

chatgptguru opened this issue Feb 22, 2024 · 14 comments

Comments

@chatgptguru
Copy link

chatgptguru commented Feb 22, 2024

Checked other resources

  • I added a very descriptive title to this issue.
  • I searched the LangChain documentation with the integrated search.
  • I used the GitHub search to find a similar question and didn't find it.
  • I am sure that this is a bug in LangChain rather than my code.

Example Code

import os
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_openai import ChatOpenAI
from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
import operator
from typing import Annotated, Any, Dict, List, Optional, Sequence, TypedDict
import functools
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.graph import StateGraph, END
from langchain_community.utilities import SerpAPIWrapper
from langchain.agents import Tool
from utils import toggle_case,sort_string

os.environ["OPENAI_API_KEY"] = "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"
os.environ["SERPER_API_KEY"] = "c3b73653f4256d5f2b4b5cf4e6fa438d736de7a4717b0fe06d92df0f30fbd3ce"

class AgentSupervisor:
def init(self, llm):
self.llm = llm

def getAgentSupervisor():
    search = SerpAPIWrapper(serpapi_api_key=os.environ["SERPER_API_KEY"])
    tools = [
        Tool(
            name="Search",
            func=search.run,
            description="useful for when you need to answer questions about current events",
        ),
        Tool(
            name="Toogle_Case",
            func=lambda word: toggle_case(word),
            description="use when you want to convert the letter to uppercase or lowercase",
        ),
        Tool(
            name="Sort_String",
            func=lambda string: sort_string(string),
            description="use when you want to sort a string alphabetically",
        ),
    ]

    def create_agent(llm: ChatOpenAI, tools: list, system_prompt: str):
        # Each worker node will be given a name and some tools.
        prompt = ChatPromptTemplate.from_messages(
            [
                (
                    "system",
                    system_prompt,
                ),
                MessagesPlaceholder(variable_name="messages"),
                MessagesPlaceholder(variable_name="agent_scratchpad"),
            ]
        )
        agent = create_openai_tools_agent(llm, tools, prompt)
        executor = AgentExecutor(agent=agent, tools=tools)
        return executor

    def agent_node(state, agent, name):
        result = agent.invoke(state)
        return {"messages": [HumanMessage(content=result["output"], name=name)]}

    members = ["AIAssistant", "Coder"]
    system_prompt = (
        "You are a supervisor tasked with managing a conversation between the"
        " following workers:  {members}. Given the following user request,"
        " respond with the worker to act next. Each worker will perform a"
        " task and respond with their results and status. When finished,"
        " respond with FINISH."
    )
    # Our team supervisor is an LLM node. It just picks the next agent to process
    # and decides when the work is completed
    options = ["FINISH"] + members
    # Using openai function calling can make output parsing easier for us
    function_def = {
        "name": "route",
        "description": "Select the next role.",
        "parameters": {
            "title": "routeSchema",
            "type": "object",
            "properties": {
                "next": {
                    "title": "Next",
                    "anyOf": [
                        {"enum": options},
                    ],
                }
            },
            "required": ["next"],
        },
    }
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", system_prompt),
            MessagesPlaceholder(variable_name="messages"),
            (
                "system",
                "Given the conversation above, who should act next?"
                " Or should we FINISH? Select one of: {options}",
            ),
        ]
    ).partial(options=str(options), members=", ".join(members))

    llm = ChatOpenAI(model="gpt-4-1106-preview", streaming=True)

    supervisor_chain = (
        prompt
        | llm.bind_functions(functions=[function_def], function_call="route")
        | JsonOutputFunctionsParser()
    )

    class AgentState(TypedDict):
        # The annotation tells the graph that new messages will always
        # be added to the current states
        messages: Annotated[Sequence[BaseMessage], operator.add]
        # The 'next' field indicates where to route to next
        next: str


    research_agent = create_agent(llm, tools, "You are a ai assistant to provide personalized answer to people.")
    research_node = functools.partial(agent_node, agent=research_agent, name="AIAssistant")

    # NOTE: THIS PERFORMS ARBITRARY CODE EXECUTION. PROCEED WITH CAUTION
    code_agent = create_agent(
        llm,
        tools,
        "You may generate safe python code to analyze data and generate charts using matplotlib.",
    )
    code_node = functools.partial(agent_node, agent=code_agent, name="Coder")

    workflow = StateGraph(AgentState)
    workflow.add_node("AIAssistant", research_node)
    workflow.add_node("Coder", code_node)
    workflow.add_node("supervisor", supervisor_chain)

    for member in members:
        # We want our workers to ALWAYS "report back" to the supervisor when done
        workflow.add_edge(member, "supervisor")
    # The supervisor populates the "next" field in the graph state
    # which routes to a node or finishes
    conditional_map = {k: k for k in members}
    conditional_map["FINISH"] = END
    workflow.add_conditional_edges("supervisor", lambda x: x["next"], conditional_map)
    # Finally, add entrypoint
    workflow.set_entry_point("supervisor")

    graph = workflow.compile()
    return graph

agent_supervisor = AgentSupervisor.getAgentSupervisor()
agent_name = ''
for s in agent_supervisor.stream(
{
"messages": [
HumanMessage(content=question)
]
},
{
"recursion_limit": 100
}
):
if "end" not in s:
if 'supervisor' in s:
agent_name = s['supervisor']['next']
if agent_name != "FINISH":
await websocket.send_text(json.dumps({"token":"AgentName:"+agent_name+"\n"}))
print(agent_name)
if agent_name in s:
content = s[agent_name]['messages'][0].content
await websocket.send_text(json.dumps({"token":"Response:"+content+"\n"}))
print(content)
print("----")

Error Message and Stack Trace (if applicable)

No Error, it is outputing properly, but I need a way to stream tokens of agent response, it is outputing full agent response now.

Description

I am trying to stream tokens of agent response in agent super visor.
Right now, it is outputing agent name and full agent response, Here I want to stream tokens of agent response.

System Info

platform: windows
python version: 3.11.2
langchain version: latest version

@sploithunter
Copy link

It should be astream_log() but it is broken in some agents. I have an issue filed for the break.

@sploithunter
Copy link

try this:

graph = workflow.compile()

async def main():
   async for output in graph.astream_log(
        {
            "messages": [
                HumanMessage(content="Code hello world and print it to the terminal")
            ]
        }, include_types=["llm"]
    ):
        for op in output.ops:
            if op["path"] == "/streamed_output/-":
                # this is the output from .stream()
                ...
            elif op["path"].startswith("/logs/") and op["path"].endswith(
                "/streamed_output/-"
            ):
                # because we chose to only include LLMs, these are LLM tokens
                print(op["value"])
if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

@chatgptguru
Copy link
Author

chatgptguru commented Feb 27, 2024

Thank you for your reply, but I am facing this issue. And also can you please advice how to add converational memory there after we fix streaming issue?
image

@sploithunter
Copy link

That is the issue I have filed. It is not fixed yet. It appears in many graphs, but not all.

@chatgptguru
Copy link
Author

Thank you for your response, have you ever implemented conversational memory in agent supervisor?

@Sulomus
Copy link

Sulomus commented Mar 12, 2024

I have faced the same issue this week - any luck guys?

The streaming doesnt seem to work at all in the langgraph library :(

@chatgptguru
Copy link
Author

chatgptguru commented Mar 12, 2024 via email

@dmitryrPlanner5D
Copy link

It seems I have figured it out how to fix tokens streaming.

I am not sure about your code, because it is not async, but I was using this notebook and the graph was not streaming tokens as expected. After debugging internals of langgraph, I have figured out that you need to create extra parameter for your llm calling function and pass it to llm:

# Define the function that calls the model
async def call_model(messages, config):
    response = await model.ainvoke(messages, config=config)
    # We return a list, because this will get added to the existing list
    return response

That way langgraph is able to pass callbacks to the llm to handle its stream and I got my tokens stream.

Again, I am not sure about your code, because tutorials suggest to use graph.astream_events rather than graph.stream

@unfailingsalvage1448
Copy link

It seems I have figured it out how to fix tokens streaming.

I am not sure about your code, because it is not async, but I was using this notebook and the graph was not streaming tokens as expected. After debugging internals of langgraph, I have figured out that you need to create extra parameter for your llm calling function and pass it to llm:

# Define the function that calls the model
async def call_model(messages, config):
    response = await model.ainvoke(messages, config=config)
    # We return a list, because this will get added to the existing list
    return response

That way langgraph is able to pass callbacks to the llm to handle its stream and I got my tokens stream.

Again, I am not sure about your code, because tutorials suggest to use graph.astream_events rather than graph.stream

Thanks this worked for me with AzureChatOpenAI

@PlebeiusGaragicus
Copy link

@dmitryrPlanner5D - I spent a few hours trying to solve this and you figured it out for me! Passing the config to the LLM enables chunk streaming inside graphs!!! Wonderful!!!

If this were nostr I would zap you!

@usersina
Copy link

I have the same problem with this setup

# List of members participating in the conversation
members = ["MongoDBAgent"]

# Setup the nodes
supervisor_node = supervisor_factory.setup_node(members)
mongo_node = mongo_factory.setup_node(config)

# Setup the graph and add the nodes
workflow = StateGraph(AgentState)
workflow.add_node("Supervisor", supervisor_node)
workflow.add_node("MongoDBAgent", mongo_node)

# Define the edges
for member in members:
    workflow.add_edge(member, "Supervisor")

# The supervisor populates the "next" state, hence routing the conversation
# A conditional map is a dict that maps the output of the supervisor to the next node
# e.g. {'MongoDBAgent': 'MongoDBAgent', 'FINISH': END}
conditional_map = {member: member for member in members}
conditional_map["FINISH"] = END
print(conditional_map)

workflow.add_conditional_edges("Supervisor", lambda state: state["next"], conditional_map)

# Entry point
workflow.set_entry_point("Supervisor")

# Compile
graph = workflow.compile()

with the following factory method for MongoDB

def create_agent(
    llm: ChatOpenAI, tools: Sequence[BaseTool], system_prompt: str
) -> AgentExecutor:
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                system_prompt,
            ),
            MessagesPlaceholder(variable_name="messages"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ]
    )
    agent = create_openai_tools_agent(llm, tools, prompt)
    executor = AgentExecutor(agent=agent, tools=tools)  # type: ignore https://github.com/langchain-ai/langchain/issues/13075
    return executor


def setup_agent(config: Config) -> AgentExecutor:
    llm = ChatOpenAI(model=config.model, streaming=config.streaming)
    return create_agent(llm, _get_tools(config), system_message)


def setup_node(config: Config) -> functools.partial[dict[str, list[HumanMessage]]]:
    mongo_agent = setup_agent(config)
    mongo_node = functools.partial(agent_node, agent=mongo_agent, name=NODE_NAME)

    return mongo_node

Calling my graph with

async for output in graph.astream_log(inputs, include_types=["llm"]):
    # astream_log() yields the requested logs (here LLMs) in JSONPatch format
    for op in output.ops:
        if op["path"] == "/streamed_output/-":
            # this is the output from .stream()
            ...
        elif op["path"].startswith("/logs/") and op["path"].endswith(
            "/streamed_output/-"
        ):
            # because we chose to only include LLMs, these are LLM tokens
            print(op["value"])

Gives the following error

KeyError: 'MongoMongoDBMongoDBAgent'

I can't see where I can add the config

It seems I have figured it out how to fix tokens streaming.

I am not sure about your code, because it is not async, but I was using this notebook and the graph was not streaming tokens as expected. After debugging internals of langgraph, I have figured out that you need to create extra parameter for your llm calling function and pass it to llm:

# Define the function that calls the model
async def call_model(messages, config):
    response = await model.ainvoke(messages, config=config)
    # We return a list, because this will get added to the existing list
    return response

That way langgraph is able to pass callbacks to the llm to handle its stream and I got my tokens stream.

Again, I am not sure about your code, because tutorials suggest to use graph.astream_events rather than graph.stream

@dmitryrPlanner5D
Copy link

Hi, @usersina
Regarding your KeyError, it seems to be connected to MongoDB setup rather than LangGraph. I am not familiar with Mongo, so can not help.

And regarding config: in my solution, I suggest to add config parameter to function that you pass to workflow.add_node. In your case it is the function produced by mongo_factory.setup_node(config). So after your function building (and after all functools.partial) it should have only 2 parameters: graph_state and config. And this config you can pass to llm.invoke to enable streaming.

Also I see that you already have parameter with name config for different purposes, so be careful to not confuse it with LangChain config I suggest to use.

@usersina
Copy link

usersina commented Apr 23, 2024

@dmitryrPlanner5D , thanks for the suggestion!

The MongoDB setup is just a simple tool to know the names of the collections in a mongodb database, so nothing crazy. I did however follow what you said and streaming is working now. However, I still see the KeyError executing:

inputs = {"messages": [HumanMessage(content="How many collections do we have?")]}
async for output in graph.astream_log(inputs, include_types=["llm"]):
    for op in output.ops:
        print(op)

{'op': 'replace', 'path': '', 'value': {'id': '1e88305f-e06c-4f9f-8120-20208defbb88', 'streamed_output': [], 'final_output': None, 'logs': {}, 'name': 'LangGraph', 'type': 'chain'}}
{'op': 'add', 'path': '/logs/ChatOpenAI', 'value': {'id': '69000165-10d9-4828-962d-653e95bbc436', 'name': 'ChatOpenAI', 'type': 'llm', 'tags': ['seq:step:2'], 'metadata': {}, 'start_time': '2024-04-23T13:54:01.044+00:00', 'streamed_output': [], 'streamed_output_str': [], 'final_output': None, 'end_time': None}}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '', 'name': 'route'}})}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '{"', 'name': ''}})}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': 'next', 'name': ''}})}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '":"', 'name': ''}})}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': 'Mongo', 'name': ''}})}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': 'DB', 'name': ''}})}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': 'Agent', 'name': ''}})}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '"}', 'name': ''}})}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''}
{'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='')}
{'op': 'add', 'path': '/logs/ChatOpenAI/final_output', 'value': {'generations': [[{'text': '', 'generation_info': {'finish_reason': 'stop'}, 'type': 'ChatGenerationChunk', 'message': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '{"next":"MongoDBAgent"}', 'name': 'route'}}, response_metadata={'finish_reason': 'stop'}, id='run-69000165-10d9-4828-962d-653e95bbc436')}]], 'llm_output': None, 'run': None}}
{'op': 'add', 'path': '/logs/ChatOpenAI/end_time', 'value': '2024-04-23T13:54:02.193+00:00'}
{'op': 'add', 'path': '/streamed_output/-', 'value': {'Supervisor': {'next': 'MongoMongoDBMongoDBAgent'}}}
{'op': 'replace', 'path': '/final_output', 'value': {'Supervisor': {'next': 'MongoMongoDBMongoDBAgent'}}}

The reason there is a KeyError is because the graph is trying to look for the next node to run, which in my case is named MongoDBAgent. But, if you take a look at the logs, it's trying to run "MongoMongoDBMongoDBAgent" which doesn't make sense.

This is most definitely because my AgentState looks like this:

class AgentState(TypedDict):
    """
    The agent state is the input to each node in the graph
    """

    messages: Annotated[Sequence[BaseMessage], operator.add]
    """
    The annotation tells the graph that new messages will always be added
    to the current state
    """

    next: str
    """
    The next node to execute
    """

The culprit, I think is the operator.add in the Annotated messages. I did see a similar issue that I will be diving into.

@usersina
Copy link

I found the issue and the solution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants