Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[candidate_isaac] opengpts: ingest progress bar and python eval tool #326

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

isaacwr
Copy link

@isaacwr isaacwr commented Apr 30, 2024

A couple of notes

  • Realistically, these should be separate PRs, but keeping them together to simplify things.
  • There are lots of test changes included here, this is because the rate limit on the free version of OpenAI API is low enough that I cannot get any real-life tests to run.
  • There are some formatting changes that my editor added which should have been removed. Once I get the hang of git add --interactive I'll skip those ;)

In this diff

  1. For the /ingest endpoint on the OpenGPTs server, stream updates back to the caller as each file completes uploading.
  2. Unrelated, add a script which runs a provided Python source in a simple and secure Docker container.

/ingest

Considerations

The key change in this section is to return a StreamingResponse around the ingest_runnable.abatch_as_completed async generator. In server.py:

return StreamingResponse(
  decode_ingestion_response(
    ingest_runnable.abatch_as_completed(
      [file.file for file in files], config
    )
  )
)

Test Plan

Open OpenGPTs frontend in browser. Navigate to existing Thread and upload multiple files along with a dummy message. Observe that progress information is printed via. console.log.

Modify source to omit show_progress_bar on the client side, and notice how no progress info is printed.

Modify source to use previous and new versions of server side to show that providing show_progress_bar to server which does not understand it, does not cause any changes.

For a smaller test case, you can use the following script to make an /ingest request:

async function runTest() {
  const url = new URL("http://localhost:5173/ingest?user=me");
  const formData = make_form_data(true);
  console.log(formData);
  return await make_request(url, formData);
}

function make_form_data(show_progress_bar: Boolean): FormData {
  // Assemble request with assistant ID, optional progress bar flag, and two fake files
  let formData = new FormData()
  let fileA = new File(["AAA. This is the content for file A\n"], "A", { type: "text/plain", });
  let fileB = new File(["BBB. This is the content for file B\n"], "B", { type: "text/plain", });
  formData.append("files", fileA);
  formData.append("files", fileB);
  formData.append("config", JSON.stringify({configurable: { assistant_id: "f08f6330-c5a2-42c7-8e7c-80aade10b1c5", show_progress_bar: show_progress_bar }}));

  return formData
}

async function make_request(url: URL, formData: FormData) {
  const response = await fetch(url, {
    method: "POST",
    body: formData,
  });
  if (response.body instanceof ReadableStream) {
    let total = formData.getAll("files").length
    let progress = 0;

    const reader = response.body.getReader();
    reader.read().then(function read_progress({ done, value }) {
      if (!done) {
        // If the server understands the progress bar, it will send messages like
        // [0, msg0], [1, msg1], ...
        // Check to make sure we are receiving well formed responses before
        // printing progress info.
        const data = new TextDecoder().decode(value);
        const dataJson = JSON.parse(data);
        if (dataJson instanceof Array && dataJson.length == 2 && typeof dataJson[0] === 'number') {
          progress += 1
          console.log(`Progress ${progress} / ${total} (Data: ${data})`)
        }
        reader.read().then(read_progress);
      }
    });
  }
  return response
}

runTest().then(data => console.log(data));

Python Source

Considerations

The main tradeoff in this script is how the Docker container gets created. There are two basic approaches that come to mind:

  1. We build a dedicated scheduler, which at its most basic level will be a server which takes incoming requests, creates and runs containers as desired, potentially on different hosts, keeps track of the state of the current workload and cleans up after itself. This has advantages like (a) can better control resource usage by killing / cleaning long-running containers and (b) better separation of concerns and potential for optimization (e.g. can share python implementation across containers, instead of re-installing on each image). The downside is that realistically we do not want to run our own scheduler unless we have a really good reason to do so because they are complex and heavy.
  2. We run containers in an ad-hoc way. Each time we receive a request, we start from scratch, build an image, run the workload, and wait for the result. This is dead simple, but limits our ability to do resource management, intelligent scheduling of workloads onto free hosts, and have monitors for basic failures like stuck jobs.

For this diff, I implemented the ad-hoc strategy because we don't have a compelling reason to add the complexity of the full server approach, and the ad-hoc approach is portable and flexible.

Test Plan

Provide the following Python source in e.g. test.py, and observe that easy escalations to root are not possible.

import os
import pwd
import subprocess

user = pwd.getpwuid(os.getuid())
print(f"This is a test being run by user {user.pw_name} ({user.pw_uid}:{user.pw_gid})")

try:
    print(subprocess.check_output("su -", shell=True, text=True))
except Exception as e:
    print(f"Failed to become root via subprocess: {e}")

try:
    os.setuid(1)
except Exception as e:
    print(f"Failed to become root via os.setuid(1): {e}")

@@ -44,7 +48,27 @@ async def ingest_files(
if thread is None:
raise HTTPException(status_code=404, detail="Thread not found.")

return ingest_runnable.batch([file.file for file in files], config)
if config["configurable"].get("show_progress_bar"):
ingest_runnable.abatch_as_completed([file.file for file in files], config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this shouldn't be here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes sorry, this is a mistake.

@@ -44,7 +48,27 @@ async def ingest_files(
if thread is None:
raise HTTPException(status_code=404, detail="Thread not found.")

return ingest_runnable.batch([file.file for file in files], config)
if config["configurable"].get("show_progress_bar"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would maybe be better to branch based on request headers? eg the Accept header

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the Accept header here makes sense

yield orjson.dumps(x)
await asyncio.sleep(3) # Debug: to demonstrate streaming

return StreamingResponse(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally we'd use a more standard streaming format, eg Server Sent Events

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with Server Sent Events, but just for completeness, what's the concern with fastapi.StreamingResponse()?

df.write(
string.Template(DOCKERFILE_TEMPLATE)
.substitute(
base_image="alpine",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooc why not start from a python base image?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly that I am not familiar with a small-ish python base image. I chose alpine because it looked minimal and efficient.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants