-
Notifications
You must be signed in to change notification settings - Fork 824
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[candidate_isaac] opengpts: ingest progress bar and python eval tool #326
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,18 +1,22 @@ | ||
import asyncio | ||
import logging | ||
import os | ||
from pathlib import Path | ||
|
||
import orjson | ||
from fastapi import FastAPI, Form, UploadFile | ||
from fastapi.exceptions import HTTPException | ||
from fastapi.responses import StreamingResponse | ||
from fastapi.staticfiles import StaticFiles | ||
|
||
|
||
import app.storage as storage | ||
from app.api import router as api_router | ||
from app.auth.handlers import AuthedUser | ||
from app.lifespan import lifespan | ||
from app.upload import ingest_runnable | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
app = FastAPI(title="OpenGPTs API", lifespan=lifespan) | ||
|
@@ -44,7 +48,27 @@ async def ingest_files( | |
if thread is None: | ||
raise HTTPException(status_code=404, detail="Thread not found.") | ||
|
||
return ingest_runnable.batch([file.file for file in files], config) | ||
if config["configurable"].get("show_progress_bar"): | ||
ingest_runnable.abatch_as_completed([file.file for file in files], config) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume this shouldn't be here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes sorry, this is a mistake. |
||
|
||
# The return type of the IngestRunnable is not compatible with the | ||
# FastAPI StreamingResponse (the data must have an `.encode()` | ||
# property). The results are (int, list) tuples, so lets use orjson to | ||
# byte the tuple | ||
async def decode_ingestion_response(ingest_generator): | ||
async for x in ingest_generator: | ||
yield orjson.dumps(x) | ||
await asyncio.sleep(3) # Debug: to demonstrate streaming | ||
|
||
return StreamingResponse( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ideally we'd use a more standard streaming format, eg Server Sent Events There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not familiar with Server Sent Events, but just for completeness, what's the concern with |
||
decode_ingestion_response( | ||
ingest_runnable.abatch_as_completed( | ||
[file.file for file in files], config | ||
) | ||
) | ||
) | ||
else: | ||
return ingest_runnable.batch([file.file for file in files], config) | ||
|
||
|
||
@app.get("/health") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
#!/usr/bin/env python | ||
|
||
import click | ||
import pathlib | ||
import docker | ||
import tempfile | ||
import string | ||
import shutil | ||
import time | ||
|
||
# We need to install a set of relevant packages so that our users can use | ||
# the python we've installed for them. If we were working with traditional | ||
# python environments, we might require our users to specify a | ||
# `requirements.txt` file to enumerate their dependencies. This is robust from | ||
# an infra perspective, but most likely places too high a burden on Python code | ||
# generators at the moment. Controlling our python environment also has | ||
# security benefits, but requires us to manually add new capabilities if our | ||
# users supply Python sources with libraries we have not installed on our | ||
# testbed. | ||
DOCKERFILE_TEMPLATE = """ | ||
FROM $base_image | ||
|
||
# Setup working directory | ||
RUN mkdir -p $workdir | ||
WORKDIR $workdir | ||
RUN chown $user:$group $workdir | ||
|
||
# Copy workload into place | ||
ADD $workload $workdir/$workload | ||
|
||
# Install python | ||
RUN apk add python3 | ||
|
||
# Set user identity | ||
USER $user:$group | ||
|
||
# Run workload | ||
CMD python $workdir/$workload | ||
""" | ||
|
||
|
||
@click.command() | ||
@click.argument("workload", metavar="<workload.py>", type=click.Path(exists=True)) | ||
def eval_tool(workload: pathlib.Path): | ||
workload = pathlib.Path(workload).absolute() | ||
if not workload.is_file(): | ||
raise click.UsageError(f"Filename {workload} does is not a regular file.") | ||
if workload.suffix != ".py": | ||
raise click.UsageError(f"Filename {workload} is not a python file.") | ||
client = docker.from_env() | ||
|
||
# In order to add our workload file to the image, we need to provide the | ||
# relative path from the Dockerfile context to the workload file, and the | ||
# file must be in a subtree of the Dockerfile context dir. The Docker SDK | ||
# provides the ability to pass a fileobj directly, however, it generates a | ||
# tempfile location for the Dockerfile context, and so we cannot place our | ||
# target file inside of the context. Therefore, we generate a tempfile of | ||
# our own, and assemble the image that way. | ||
click.echo(f"Building image for {workload}") | ||
with tempfile.TemporaryDirectory() as tmp_dir: | ||
tmp_dir_path = pathlib.Path(tmp_dir) | ||
shutil.copy(workload, tmp_dir_path.joinpath("workload.py")) | ||
with open(tmp_dir_path.joinpath("Dockerfile"), mode="wb") as df: | ||
df.write( | ||
string.Template(DOCKERFILE_TEMPLATE) | ||
.substitute( | ||
base_image="alpine", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ooc why not start from a python base image? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mostly that I am not familiar with a small-ish python base image. I chose |
||
workdir="/workdir", | ||
workload="workload.py", | ||
user="nobody", | ||
group="nobody", | ||
) | ||
.encode() | ||
) | ||
image, _ = client.images.build(path=tmp_dir, rm=True) | ||
click.echo(f"Image: {image.id}") | ||
|
||
container_opts = { | ||
# Clean up container on exit | ||
"remove": True, | ||
# Create unique cgroup for this container | ||
"cgroupns": "private", | ||
# Do not consume too many resources | ||
"mem_limit": "4gb", | ||
# Do not give extended privileges to the container | ||
"privileged": False, | ||
"security_opt": ["no-new-privileges=true"], | ||
# Mount root filesystem as readonly | ||
"read_only": True, | ||
} | ||
click.echo("Running container") | ||
start = time.time() | ||
output = client.containers.run(image, **container_opts) | ||
end = time.time() | ||
click.echo("Done") | ||
click.echo("========") | ||
click.echo(output) | ||
click.echo("========") | ||
click.echo(f"({end-start})") | ||
|
||
|
||
if __name__ == "__main__": | ||
eval_tool() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would maybe be better to branch based on request headers? eg the
Accept
headerThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the
Accept
header here makes sense