Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[candidate_isaac] opengpts: ingest progress bar and python eval tool #326

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 15 additions & 1 deletion backend/app/api/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,21 @@ async def stream_run(
"""Create a run."""
input_, config = await _run_input_and_config(payload, user["user_id"])

return EventSourceResponse(to_sse(astream_state(agent, input_, config)))
# DEBUG When being rate limitted, skip generating actual events.
async def fake_event_stream():
import orjson

for x in range(3):
yield {
"event": "metadata",
"data": orjson.dumps(
{"run_id": f"irothschild debug fake run id {x}"}
).decode(),
}

return EventSourceResponse(fake_event_stream())

# return EventSourceResponse(to_sse(astream_state(agent, input_, config)))


@router.get("/input_schema")
Expand Down
1 change: 1 addition & 0 deletions backend/app/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
This code should be agnostic to how the blob got generated; i.e., it does not
know about server/uploading etc.
"""

from typing import List

from langchain.text_splitter import TextSplitter
Expand Down
26 changes: 25 additions & 1 deletion backend/app/server.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import asyncio
import logging
import os
from pathlib import Path

import orjson
from fastapi import FastAPI, Form, UploadFile
from fastapi.exceptions import HTTPException
from fastapi.responses import StreamingResponse
from fastapi.staticfiles import StaticFiles


import app.storage as storage
from app.api import router as api_router
from app.auth.handlers import AuthedUser
from app.lifespan import lifespan
from app.upload import ingest_runnable


logger = logging.getLogger(__name__)

app = FastAPI(title="OpenGPTs API", lifespan=lifespan)
Expand Down Expand Up @@ -44,7 +48,27 @@ async def ingest_files(
if thread is None:
raise HTTPException(status_code=404, detail="Thread not found.")

return ingest_runnable.batch([file.file for file in files], config)
if config["configurable"].get("show_progress_bar"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would maybe be better to branch based on request headers? eg the Accept header

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the Accept header here makes sense

ingest_runnable.abatch_as_completed([file.file for file in files], config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this shouldn't be here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes sorry, this is a mistake.


# The return type of the IngestRunnable is not compatible with the
# FastAPI StreamingResponse (the data must have an `.encode()`
# property). The results are (int, list) tuples, so lets use orjson to
# byte the tuple
async def decode_ingestion_response(ingest_generator):
async for x in ingest_generator:
yield orjson.dumps(x)
await asyncio.sleep(3) # Debug: to demonstrate streaming

return StreamingResponse(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally we'd use a more standard streaming format, eg Server Sent Events

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with Server Sent Events, but just for completeness, what's the concern with fastapi.StreamingResponse()?

decode_ingestion_response(
ingest_runnable.abatch_as_completed(
[file.file for file in files], config
)
)
)
else:
return ingest_runnable.batch([file.file for file in files], config)


@app.get("/health")
Expand Down
20 changes: 11 additions & 9 deletions backend/app/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class IngestRunnable(RunnableSerializable[BinaryIO, List[str]]):
assistant_id: Optional[str]
thread_id: Optional[str]
"""Ingested documents will be associated with assistant_id or thread_id.

ID is used as the namespace, and is filtered on at query time.
"""

Expand All @@ -108,14 +108,16 @@ def invoke(
self, input: BinaryIO, config: Optional[RunnableConfig] = None
) -> List[str]:
blob = _convert_ingestion_input_to_blob(input)
out = ingest_blob(
blob,
MIMETYPE_BASED_PARSER,
self.text_splitter,
self.vectorstore,
self.namespace,
)
return out
# DEBUG: Being Rate Limitted, so skipping this step
return ["irothschild debug, UploadRunnable invoke dummy response"]
# out = ingest_blob(
# blob,
# MIMETYPE_BASED_PARSER,
# self.text_splitter,
# self.vectorstore,
# self.namespace,
# )
# return out


PG_CONNECTION_STRING = PGVector.connection_string_from_db_params(
Expand Down
30 changes: 26 additions & 4 deletions frontend/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,34 @@ function App(props: { edit?: boolean }) {
}, new FormData());
formData.append(
"config",
JSON.stringify({ configurable: { thread_id } }),
JSON.stringify({ configurable: { thread_id, show_progress_bar: true } }),
);
await fetch(`/ingest`, {
method: "POST",
body: formData,
const response = await fetch(`/ingest`, {
method: "POST",
body: formData,
});
if (response.body instanceof ReadableStream) {
let total = formData.getAll("files").length
let progress = 0;

const reader = response.body.getReader();
reader.read().then(function read_progress({ done, value }) {
if (done) {
return;
}
// If the server understands the progress bar, it will send messages like
// [0, msg0], [1, msg1], ...
// Check to make sure we are receiving well formed responses before
// printing progress info.
const data = new TextDecoder().decode(value);
const dataJson = JSON.parse(data);
if (dataJson instanceof Array && dataJson.length == 2 && typeof dataJson[0] === 'number') {
progress += 1;
console.log(`Progress ${progress} / ${total} (Data: ${data})`);
}
reader.read().then(read_progress);
});
}
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down
31 changes: 27 additions & 4 deletions frontend/src/hooks/useConfigList.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,36 @@ export function useConfigList(): ConfigListProps {
}, new FormData());
formData.append(
"config",
JSON.stringify({ configurable: { assistant_id } }),
JSON.stringify({ configurable: { assistant_id, show_progress_bar: true } }),
);
await fetch(`/ingest`, {
method: "POST",
body: formData,

const response = await fetch(`/ingest`, {
method: "POST",
body: formData,
});
if (response.body instanceof ReadableStream) {
let total = formData.getAll("files").length
let progress = 0;

const reader = response.body.getReader();
reader.read().then(function read_progress({ done, value }) {
if (done) {
return;
}
// If the server understands the progress bar, it will send messages like
// [0, msg0], [1, msg1], ...
// Check to make sure we are receiving well formed responses before
// printing progress info.
const data = new TextDecoder().decode(value);
const dataJson = JSON.parse(data);
if (dataJson instanceof Array && dataJson.length == 2 && typeof dataJson[0] === 'number') {
progress += 1;
console.log(`Progress ${progress} / ${total} (Data: ${data})`);
}
reader.read().then(read_progress);
});
}

setConfigs({ ...savedConfig, mine: true });
return savedConfig.assistant_id;
},
Expand Down
103 changes: 103 additions & 0 deletions tools/eval_tool/eval_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#!/usr/bin/env python

import click
import pathlib
import docker
import tempfile
import string
import shutil
import time

# We need to install a set of relevant packages so that our users can use
# the python we've installed for them. If we were working with traditional
# python environments, we might require our users to specify a
# `requirements.txt` file to enumerate their dependencies. This is robust from
# an infra perspective, but most likely places too high a burden on Python code
# generators at the moment. Controlling our python environment also has
# security benefits, but requires us to manually add new capabilities if our
# users supply Python sources with libraries we have not installed on our
# testbed.
DOCKERFILE_TEMPLATE = """
FROM $base_image

# Setup working directory
RUN mkdir -p $workdir
WORKDIR $workdir
RUN chown $user:$group $workdir

# Copy workload into place
ADD $workload $workdir/$workload

# Install python
RUN apk add python3

# Set user identity
USER $user:$group

# Run workload
CMD python $workdir/$workload
"""


@click.command()
@click.argument("workload", metavar="<workload.py>", type=click.Path(exists=True))
def eval_tool(workload: pathlib.Path):
workload = pathlib.Path(workload).absolute()
if not workload.is_file():
raise click.UsageError(f"Filename {workload} does is not a regular file.")
if workload.suffix != ".py":
raise click.UsageError(f"Filename {workload} is not a python file.")
client = docker.from_env()

# In order to add our workload file to the image, we need to provide the
# relative path from the Dockerfile context to the workload file, and the
# file must be in a subtree of the Dockerfile context dir. The Docker SDK
# provides the ability to pass a fileobj directly, however, it generates a
# tempfile location for the Dockerfile context, and so we cannot place our
# target file inside of the context. Therefore, we generate a tempfile of
# our own, and assemble the image that way.
click.echo(f"Building image for {workload}")
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_dir_path = pathlib.Path(tmp_dir)
shutil.copy(workload, tmp_dir_path.joinpath("workload.py"))
with open(tmp_dir_path.joinpath("Dockerfile"), mode="wb") as df:
df.write(
string.Template(DOCKERFILE_TEMPLATE)
.substitute(
base_image="alpine",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooc why not start from a python base image?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly that I am not familiar with a small-ish python base image. I chose alpine because it looked minimal and efficient.

workdir="/workdir",
workload="workload.py",
user="nobody",
group="nobody",
)
.encode()
)
image, _ = client.images.build(path=tmp_dir, rm=True)
click.echo(f"Image: {image.id}")

container_opts = {
# Clean up container on exit
"remove": True,
# Create unique cgroup for this container
"cgroupns": "private",
# Do not consume too many resources
"mem_limit": "4gb",
# Do not give extended privileges to the container
"privileged": False,
"security_opt": ["no-new-privileges=true"],
# Mount root filesystem as readonly
"read_only": True,
}
click.echo("Running container")
start = time.time()
output = client.containers.run(image, **container_opts)
end = time.time()
click.echo("Done")
click.echo("========")
click.echo(output)
click.echo("========")
click.echo(f"({end-start})")


if __name__ == "__main__":
eval_tool()