Skip to content

Commit

Permalink
Merge pull request #24 from e2b-dev/v0.2
Browse files Browse the repository at this point in the history
Agent protocol spec reworked (v0.2)
  • Loading branch information
jakubno committed Aug 7, 2023
2 parents e29b884 + e9d19f9 commit fc32ea0
Show file tree
Hide file tree
Showing 14 changed files with 523 additions and 195 deletions.
1 change: 1 addition & 0 deletions agent/python/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ generate:
--output agent_protocol
mv agent_protocol/main.py agent_protocol/server.py
rm -rf agent_protocol/routers
rm agent_protocol/dependencies.py
65 changes: 49 additions & 16 deletions agent/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,61 @@ pip install agent-protocol
Then add the following code to your agent:

```python
from agent_protocol import (
Agent,
StepResult,
StepHandler,
)
from agent_protocol import Agent, Step, Task

async def task_handler(task_input) -> StepHandler:
# TODO: Initialize code for the agent task.
async def step_handler(step_input) -> StepResult:
# TODO: Execute code for the agent step.
# The step could for example be a single iteration of a ReAct loop.

# TODO: Return the result of the step.
return StepResult(
output=...,
)
async def task_handler(task: Task) -> None:
# TODO: Create initial step(s) for the task
await Agent.db.create_step(task.task_id, ...)


async def step_handler(step: Step) -> Step:
# TODO: handle next step
if step.name == "print":
print(step.input)
step.is_last = True

step.output = "Output from the agent"
return step

return step_handler

if __name__ == "__main__":
# Add the task handler and start the server
Agent.handle_task(task_handler).start()
Agent.setup_agent(task_handler, step_handler).start()
```

## Customization

### Database

By default the SDK stores data in memory. You can customize the database by setting db to your own database object.

```python
Agent.db = your_database
```

The database object must implement the methods from [db.py](./agent/python/db.py).

### Routes

You can also add your own routes to the server. For example:

```python
from agent_protocol import Agent, router
from fastapi import APIRouter

my_router = APIRouter()


@my_router.get("/hello")
async def hello():
return {"hello": "world"}

my_router.include_router(router)

task_handler = ...
step_handler = ...
Agent.setup_agent(task_handler, step_handler).start(router=my_router)
```

## Usage
Expand Down
18 changes: 17 additions & 1 deletion agent/python/agent_protocol/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,17 @@
from .agent import Agent, StepHandler, StepResultWithDefaults as StepResult
from .agent import Agent, StepHandler, TaskHandler, base_router as router
from .models import Artifact, StepRequestBody, TaskRequestBody
from .db import Step, Task, TaskDB


__all__ = [
"Agent",
"Artifact",
"Step",
"StepHandler",
"Task",
"TaskDB",
"StepRequestBody",
"TaskHandler",
"TaskRequestBody",
"router",
]
171 changes: 102 additions & 69 deletions agent/python/agent_protocol/agent.py
Original file line number Diff line number Diff line change
@@ -1,93 +1,85 @@
import asyncio
import os

import uuid

from fastapi import APIRouter
from fastapi.responses import FileResponse
from hypercorn.asyncio import serve
from hypercorn.config import Config
from typing import Awaitable, Callable, List, Optional, Tuple, Any

from typing import Awaitable, Callable, List, Optional

from .db import InMemoryTaskDB, TaskDB
from .server import app
from .models import (
Task,
TaskRequestBody,
Step,
StepInput,
StepRequestBody,
TaskInput,
StepResult,
Artifact,
Task,
Status,
)


StepHandler = Callable[[StepInput | None], Awaitable[StepResult]]
TaskHandler = Callable[[TaskInput | None], Awaitable[StepHandler]]
StepHandler = Callable[[Step], Awaitable[Step]]
TaskHandler = Callable[[Task], Awaitable[None]]

tasks: List[Tuple[Task, StepHandler]] = []
steps: List[Step] = []

task_handler: Optional[TaskHandler]
_task_handler: Optional[TaskHandler]
_step_handler: Optional[StepHandler]


class StepResultWithDefaults(StepResult):
output: Any
artifacts: List[Artifact] = []
is_last: bool = False
base_router = APIRouter()


@app.post("/agent/tasks", response_model=Task, tags=["agent", "tasks"])
@base_router.post("/agent/tasks", response_model=Task, tags=["agent"])
async def create_agent_task(body: TaskRequestBody | None = None) -> Task:
"""
Creates a task for the agent.
"""
if not task_handler:
if not _task_handler:
raise Exception("Task handler not defined")

step_handler = await task_handler(body.input if body else None)
task = Task(
task_id=str(uuid.uuid4()),
task = await Agent.db.create_task(
input=body.input if body else None,
artifacts=[],
additional_input=body.additional_input if body else None,
)
tasks.append((task, step_handler))
await _task_handler(task)

return task


@app.get("/agent/tasks", response_model=List[str], tags=["agent", "tasks"])
@base_router.get("/agent/tasks", response_model=List[str], tags=["agent"])
async def list_agent_tasks_ids() -> List[str]:
"""
List all tasks that have been created for the agent.
"""
return [t[0].task_id for t in tasks]
return [task.task_id for task in await Agent.db.list_tasks()]


@app.get("/agent/tasks/{task_id}", response_model=Task, tags=["agent", "tasks"])
@base_router.get("/agent/tasks/{task_id}", response_model=Task, tags=["agent"])
async def get_agent_task(task_id: str) -> Task:
"""
Get details about a specified agent task.
"""
task = next(filter(lambda t: t[0].task_id == task_id, tasks), None)
if not task:
raise Exception(f"Task with id {task_id} not found")
return task[0]
return await Agent.db.get_task(task_id)


@app.get(
@base_router.get(
"/agent/tasks/{task_id}/steps",
response_model=List[str],
tags=["agent", "tasks", "steps"],
tags=["agent"],
)
async def list_agent_task_steps(task_id: str) -> List[str]:
"""
List all steps for the specified task.
"""
return [t.step_id for t in steps if t.task_id == task_id]
task = await Agent.db.get_task(task_id)
return [s.step_id for s in task.steps]


@app.post(
@base_router.post(
"/agent/tasks/{task_id}/steps",
response_model=Step,
tags=["agent", "tasks", "steps"],
tags=["agent"],
)
async def execute_agent_task_step(
task_id: str,
Expand All @@ -96,63 +88,104 @@ async def execute_agent_task_step(
"""
Execute a step in the specified agent task.
"""
task = next(filter(lambda t: t[0].task_id == task_id, tasks), None)
if not task:
raise Exception(f"Task with id {task_id} not found")
task = await Agent.db.get_task(task_id)
step = next(filter(lambda x: x.status == Status.created, task.steps), None)

handler = task[1]
result = await handler(body.input if body else None)
if not step:
raise Exception("No steps to execute")

step = Step(
task_id=task_id,
step_id=str(uuid.uuid4()),
input=body.input if body else None,
output=result.output,
artifacts=result.artifacts,
is_last=result.is_last,
)
step.input = body.input if body else None
step.additional_input = body.additional_input if body else None

step = await _step_handler(step)

step.status = Status.completed

if step.artifacts:
if task[0].artifacts is None:
task[0].artifacts = step.artifacts
else:
task[0].artifacts.extend(step.artifacts)
task.artifacts.extend(step.artifacts)

steps.append(step)
return step


@app.get(
@base_router.get(
"/agent/tasks/{task_id}/steps/{step_id}",
response_model=Step,
tags=["agent", "tasks", "steps"],
tags=["agent"],
)
async def get_agent_task_step(task_id: str, step_id: str = ...) -> Step:
"""
Get details about a specified task step.
"""
step = next(
filter(
lambda t: t.task_id == task_id and t.step_id == step_id,
steps,
),
None,
return await Agent.db.get_step(task_id, step_id)


@base_router.get(
"/agent/tasks/{task_id}/artifacts",
response_model=List[Artifact],
tags=["agent"],
)
async def list_agent_task_artifacts(task_id: str) -> List[Artifact]:
"""
List all artifacts for the specified task.
"""
task = await Agent.db.get_task(task_id)
return task.artifacts


@base_router.get(
"/agent/tasks/{task_id}/artifacts/{artifact_id}",
tags=["agent"],
)
async def get_agent_task_artifacts(task_id: str, artifact_id: str) -> FileResponse:
"""
Download the specified artifact.
"""
artifact = await Agent.db.get_artifact(task_id, artifact_id)
path = Agent.get_artifact_path(task_id, artifact)
return FileResponse(
path=path, media_type="application/octet-stream", filename=artifact.file_name
)
if not step:
raise Exception(f"Step with task id {task_id} and step id {step_id} not found")
return step


class Agent:
db: TaskDB = InMemoryTaskDB()
workspace: str = os.getenv("AGENT_WORKSPACE", "workspace")

@staticmethod
def handle_task(handler: Callable[[TaskInput | None], Awaitable[StepHandler]]):
global task_handler
task_handler = handler
def setup_agent(task_handler: TaskHandler, step_handler: StepHandler):
"""
Set the agent's task and step handlers.
"""
global _task_handler
_task_handler = task_handler

global _step_handler
_step_handler = step_handler

return Agent

@staticmethod
def start(port: int = 8000):
def get_workspace(task_id: str) -> str:
"""
Get the workspace path for the specified task.
"""
return os.path.join(os.getcwd(), Agent.workspace, task_id)

@staticmethod
def get_artifact_path(task_id: str, artifact: Artifact) -> str:
"""
Get the artifact path for the specified task and artifact.
"""
workspace_path = Agent.get_workspace(task_id)
relative_path = artifact.relative_path or ""
return os.path.join(workspace_path, relative_path, artifact.file_name)

@staticmethod
def start(port: int = 8000, router: APIRouter = base_router):
"""
Start the agent server.
"""
config = Config()
config.bind = [f"localhost:{port}"] # As an example configuration setting
app.include_router(router)
asyncio.run(serve(app, config))
Loading

0 comments on commit fc32ea0

Please sign in to comment.