Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Postgres for custom data layer #825

Closed
wants to merge 10 commits into from
Closed

Conversation

hayescode
Copy link
Contributor

@hayescode hayescode commented Mar 19, 2024

Overview

  • Adds custom Postgres data layer
  • Inspired by @sandangel work on Support MongoDB for custom data layer #796 for MongoDB
  • Optionally configures ADLS as the blob storage provider (designed to allow other providers in the future)
    • I only have access to Azure so I can't test/configure other providers

Test System

  • OS: Windows 11
  • Python: 3.11.4
  • Postgres: v11

How to configure

Run this SQL DDL in your postgres database

DDL
CREATE TABLE users (
    "id" UUID PRIMARY KEY,
    "identifier" TEXT NOT NULL UNIQUE,
    "metadata" JSONB NOT NULL,
    "createdAt" TEXT
);

CREATE TABLE IF NOT EXISTS threads (
    "id" UUID PRIMARY KEY,
    "createdAt" TEXT,
    "name" TEXT,
    "user_id" UUID,
    "tags" TEXT[], 
    "metadata" JSONB,
    FOREIGN KEY ("user_id") REFERENCES users("id") ON DELETE CASCADE
);

CREATE TABLE steps (
    "id" UUID PRIMARY KEY,
    "name" TEXT NOT NULL,
    "type" TEXT NOT NULL,
    "threadId" UUID NOT NULL,
    "parentId" UUID,
    "disableFeedback" BOOLEAN NOT NULL,
    "streaming" BOOLEAN NOT NULL,
    "waitForAnswer" BOOLEAN,
    "isError" BOOLEAN,
    "metadata" JSONB,
    "input" TEXT,
    "output" TEXT,
    "createdAt" TEXT,
    "start" TEXT,
    "end" TEXT,
    "generation" JSONB,
    "showInput" TEXT,
    "language" TEXT,
    "indent" INT,
    FOREIGN KEY ("threadId") REFERENCES threads("id") ON DELETE CASCADE
);

CREATE TABLE IF NOT EXISTS elements (
    "id" UUID PRIMARY KEY,
    "threadId" UUID,
    "type" TEXT,
    "url" TEXT,
    "chainlitKey" TEXT,
    "name" TEXT NOT NULL,
    "display" TEXT,
    "objectKey" TEXT,
    "size" TEXT,
    "page" INT,
    "language" TEXT,
    "forId" UUID,
    "mime" TEXT,
    FOREIGN KEY ("threadId") REFERENCES threads("id") ON DELETE CASCADE,
    FOREIGN KEY ("forId") REFERENCES steps("id") ON DELETE CASCADE
);

CREATE TABLE IF NOT EXISTS feedbacks (
    "id" UUID PRIMARY KEY,
    "forId" UUID NOT NULL,
    "value" INT NOT NULL,
    "strategy" TEXT DEFAULT 'BINARY' NOT NULL,
    "comment" TEXT,
    FOREIGN KEY ("forId") REFERENCES steps("id") ON DELETE CASCADE
);

Add POSTGRES_URI environment variable in your .env file

POSTGRES_URI = user=<your_user> host=<your_host> password=<your_password> port=<your_port>sslmode=require dbname=<your_db>

Add this to your app.py

import chainlit.data as cl_data
from chainlit.data.postgres import PostgresDataLayer

cl_data._data_layer = PostgresDataLayer(uri=POSTGRES_URI)

Optional: Blob Storage

  • Add ADLS client to enable non-text storage
ADLS_SAS_TOKEN = <your_SAS_token>
  • Install module
pip install azure-storage-file-datalake==12.14.0
  • Add to app.py
from azure.storage.filedatalake import DataLakeServiceClient

cl_data._data_layer.add_blob_storage_client(blob_storage_client=blob_storage_client, access_token=ADLS_SAS_TOKEN) # Add blob storage client

Copy link
Contributor

@dahifi dahifi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great effort here, and I'd like to take a closer look at this later, here are a couple points from my automated review:

Code is fairly complex, and could use some functional refactoring and perhaps a bit more documentation on necessary env or config requirements. There are no tests. I would think a basic sanity check would be needed here, but I'm not sure what the team considers best practices here.

sql_upsert, sql_select, and sql_delete could use a helper function for the duplicated code. I also have a flag for security considerations against SQL injection. Not sure the risk/effort to support parameterized queries.

Database calls should be wrapped with error handling, not just logging. Have you checked in the literalAI SDK to see how the team handles failures currently?

TODOs shouldn't be in a PR. They need to be completed, or removed prior to approval.

Run this through ruff --autofix as well.

Keep up the good work!  

@nixent
Copy link

nixent commented Mar 19, 2024

@hayescode would it make sense to use SQLAlchemy to allow different SQL engines it supports? I'm working on a similar feature and opted for that route.

@hayescode
Copy link
Contributor Author

@dahifi thanks for your review!

Code is fairly complex, and could use some functional refactoring and perhaps a bit more documentation on necessary env or config requirements. There are no tests. I would think a basic sanity check would be needed here, but I'm not sure what the team considers best practices here.
I tried to follow the existing ChainlitDataLayer as much as possible. Could you expand on the documentation you're looking? I imagined once merged comments from my PR description will be added to docs.chainlit.io.

I don't see tests on the other data layers, are you talking about the github workflows?

sql_upsert, sql_select, and sql_delete could use a helper function for the duplicated code. I also have a flag for security considerations against SQL injection. Not sure the risk/effort to support parameterized queries.

This code is already parameterized and based on my reading of the documentation currently guards against SQL injection. Could you share some details about the security flag you speak of?

Database calls should be wrapped with error handling, not just logging. Have you checked in the literalAI SDK to see how the team handles failures currently?

sql_upsert, sql_select, and sql_delete all have try/except blocks in addition to the logging similiar to the literal SDK. I'm not sure I understand.

TODOs shouldn't be in a PR. They need to be completed, or removed prior to approval.

Fair. I moreso put those TODO as an FYI to reviewers that I know this code can be improved in various ways, but imo is un-necessary for merging as many are 'nice-to-haves'. I'll remove those comments now.

Run this through ruff --autofix as well.

I'll look into this. If this is about the mypy test failures i'm running that through GPT4 right now.

Keep up the good work!

Thanks again for looking this over and for your feedback!

@hayescode
Copy link
Contributor Author

@hayescode would it make sense to use SQLAlchemy to allow different SQL engines it supports? I'm working on a similar feature and opted for that route.

@nixent I actually started with SQLAlchemy but ran into a blocker but can't remember exactly what it was but psycopg solved it. Is your feature Postgres?

@hayescode
Copy link
Contributor Author

@nixent ya know I recently upgraded my SQL alchemy version (had to use different version because I was also using it for Databricks) and it seems to work now.

I'm going to close this PR for now and try to use that instead because I agree that would be a better solution and people could plug in their own dialects. Great feedback!

@hayescode hayescode closed this Mar 19, 2024
@hayescode
Copy link
Contributor Author

@nixent i just refactored the whole thing using SQLAlchemy 2.0.28 and it WORKS! I dunno what the deal was before. Thanks for your comment inspiring this. I'll try to get the PR out tomorrow, just need to do some more testing and fix these mypy issues. Cheers!

@PrJasperRothdale
Copy link

PrJasperRothdale commented Mar 26, 2024

Hey ! Thanks a lot for this, it's very interesting and useful !

I have some kind of bug, I'm not sure if it comes from chainlit, your code or something I did, but maybe you guys can help :

Basically, when you leave the current conversation (conversation A) by clicking on a past chat, then refresh the page and go back to conversation A by clicking on it in the past chats section and resuming, the messages are wrongly ordered, the user messages all come first, then all the AI messages follow. Furthermore, they are not always ordered the way they were typed in. This doesn't happen when you leave the chat session by using new chat or refreshing the page.

I assume this has something to do with the way the messages are ordered when inserting them or retrieving them from postgres, but I'm not sure.

Any idea where this might come from ? How can I adress this ? Thanks a lot guys !

Here is what I mean (the user messages are in the wrong order) :

image

@hayescode
Copy link
Contributor Author

Hey ! Thanks a lot for this, it's very interesting and useful !

I have some kind of bug, I'm not sure if it comes from chainlit, your code or something I did, but maybe you guys can help :

Basically, when you leave the current conversation (conversation A) by clicking on a past chat, then refresh the page and go back to conversation A by clicking on it in the past chats section and resuming, the messages are wrongly ordered, the user messages all come first, then all the AI messages follow. Furthermore, they are not always ordered the way they were typed in. This doesn't happen when you leave the chat session by using new chat or refreshing the page.

I assume this has something to do with the way the messages are ordered when inserting them or retrieving them from postgres, but I'm not sure.

Any idea where this might come from ? How can I adress this ? Thanks a lot guys !

Here is what I mean (the user messages are in the wrong order) :

image

Are you on the latest code? I also saw this but fixed it (I think). I was sorting by createdAt, but it shoulda been start>

@PrJasperRothdale
Copy link

PrJasperRothdale commented Mar 27, 2024

I'm using the 394 lines version of the code currently in the main branch. I tried to change the request code near line 140 from

query = """ SELECT DISTINCT t.id, t."createdAt" FROM threads t JOIN users u on t.user_id=u.id JOIN steps s on t.id=s."threadId" LEFT JOIN feedbacks f on s.id=f."forId" WHERE u."identifier" = %s """ if filters.search is not None: query += "AND s.\"output\" ILIKE %s\n" query_params.append(f"%{str(filters.search)}%") if filters.feedback is not None and filters.feedback != 0: query += "AND f.\"value\" IN (%s)\n" query_params.append(str(filters.feedback)) query += "ORDER BY t.\"createdAt\" DESC\n" query += "LIMIT %s" query_params.append(str(pagination.first))

to

query = """ SELECT DISTINCT t.id, t."createdAt", s."start" FROM threads t JOIN users u on t.user_id=u.id JOIN steps s on t.id=s."threadId" LEFT JOIN feedbacks f on s.id=f."forId" WHERE u."identifier" = %s """ if filters.search is not None: query += "AND s.\"output\" ILIKE %s\n" query_params.append(f"%{str(filters.search)}%") if filters.feedback is not None and filters.feedback != 0: query += "AND f.\"value\" IN (%s)\n" query_params.append(str(filters.feedback)) query += "ORDER BY s.\"start\" DESC\n" query += "LIMIT %s" query_params.append(str(pagination.first))

but I still have issues with messages ordering. Is that what you meant ?

Also, it would seem after several tests that user and ai messages are not necessarily bundled. It really just seems like when you click a past chat and then refresh, it messes up the order of whatever conversation you were having before the past chat.

@hayescode
Copy link
Contributor Author

This PR is dead look at #836

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants