Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support SQLAlchemy for custom data layer #836

Merged
merged 42 commits into from Apr 15, 2024
Merged

Conversation

hayescode
Copy link
Contributor

@hayescode hayescode commented Mar 21, 2024

Overview

  • Adds custom, direct database, data layer using SQLAlchemy with support for a wide-range of SQL dialects
  • Improved loading speeds of list_threads and get_threads dramatically versus previous recursion.
    • The code is kinda wacky/long, but the end performance is worth it.
  • Optionally configures ADLS as the blob storage provider (designed to allow other providers in the future)
    • I only have access to Azure so I can't test/configure other providers
  • Duplicated PageInfo and PaginatedResponse from literal SDK into backend/chainlit/types.py and updated typing.
  • Note: I had to add locks because the backend uses asyncio.gather and sometimes create_step is attempted before update_thread causing a foreign key violation and race condition.
    • I prefer this approach in order to maintain referential integrity of the database.
    • April 2, 2024: Removed most foreign key constraints because Chainlit backend executes tasks concurrently via .gather() causing create_step to be called before update_thread resulting in foreign key violations. I assume this was done for performance reasons. The backend would have to be re-worked to handle this and a custom solution adds complexity that should not exist.
  • Added tests to new cypress/e2e/custom_data_layer.py.

Shout-Outs

  • Thank you @nixent for suggesting to use SQLAlchemy for a broader solution to support a larger # of developers!
  • Inspired from @sandangel who showed that this can be done and shared their work
  • For testing I straight up ganked your cypress tests @tjroamer . I had never done these before.

Test System

  • OS: Windows 11
  • Python: 3.11.4
  • Postgres: v11

How to configure

Install necessary dependencies to use this custom data layer.

pip install chainlit[custom-data] --upgrade

Run this SQL DDL in your sql database

DDL
CREATE TABLE users (
    "id" UUID PRIMARY KEY,
    "identifier" TEXT NOT NULL UNIQUE,
    "metadata" JSONB NOT NULL,
    "createdAt" TEXT
);

CREATE TABLE IF NOT EXISTS threads (
    "id" UUID PRIMARY KEY,
    "createdAt" TEXT,
    "name" TEXT,
    "userId" UUID,
    "userIdentifier" TEXT,
    "tags" TEXT[], 
    "metadata" JSONB,
    FOREIGN KEY ("userId") REFERENCES users("id") ON DELETE CASCADE
);

CREATE TABLE IF NOT EXISTS steps (
    "id" UUID PRIMARY KEY,
    "name" TEXT NOT NULL,
    "type" TEXT NOT NULL,
    "threadId" UUID NOT NULL,
    "parentId" UUID,
    "disableFeedback" BOOLEAN NOT NULL,
    "streaming" BOOLEAN NOT NULL,
    "waitForAnswer" BOOLEAN,
    "isError" BOOLEAN,
    "metadata" JSONB,
    "tags" TEXT[], 
    "input" TEXT,
    "output" TEXT,
    "createdAt" TEXT,
    "start" TEXT,
    "end" TEXT,
    "generation" JSONB,
    "showInput" TEXT,
    "language" TEXT,
    "indent" INT
);

CREATE TABLE IF NOT EXISTS elements (
    "id" UUID PRIMARY KEY,
    "threadId" UUID,
    "type" TEXT,
    "url" TEXT,
    "chainlitKey" TEXT,
    "name" TEXT NOT NULL,
    "display" TEXT,
    "objectKey" TEXT,
    "size" TEXT,
    "page" INT,
    "language" TEXT,
    "forId" UUID,
    "mime" TEXT
);

CREATE TABLE IF NOT EXISTS feedbacks (
    "id" UUID PRIMARY KEY,
    "forId" UUID NOT NULL,
    "value" INT NOT NULL,
    "comment" TEXT
);

Add SQLALCHEMY_CONNINFO environment variable in your .env file

SQLALCHEMY_CONNINFO = <dialect>+<driver>://<user>:<password>:<port>/<database>
ADLS_SAS_TOKEN = <your_SAS_token> # Optional

Add this to your app.py

import chainlit.data as cl_data
from chainlit.data.sql_alchemy import SQLAlchemyDataLayer
from chainlit.data.storage_clients import AzureStorageClient

storage_client = AzureStorageClient(
    account_url="https://<your_account>.dfs.core.windows.net",
    container="<your_container>",
    credential=credential,
    sas_token=ADLS_SAS_TOKEN
    )
cl_data._data_layer = SQLAlchemyDataLayer(
    conninfo=SQLALCHEMY_CONNINFO,
    ssl_require=True,
    storage_provider=storage_client,
    user_thread_limit=100)

Copy link

@nixent nixent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for turning to SQLAlchemy that fast! I have couple of comments, primarily about DB init instead of running DDL scripts

backend/chainlit/sql_alchemy.py Outdated Show resolved Hide resolved
backend/chainlit/sql_alchemy.py Outdated Show resolved Hide resolved
backend/chainlit/sql_alchemy.py Outdated Show resolved Hide resolved
backend/chainlit/sql_alchemy.py Outdated Show resolved Hide resolved
backend/chainlit/sql_alchemy.py Outdated Show resolved Hide resolved
DALLE for example can return a url but it's only valid for a limited time, so we persist those now
@sandangel
Copy link
Contributor

can we add an interface/adapter for blob storage client?
For example: the client needs to implement upload_file, delete_file, download_file. then user can just have their own client and implement that interface instead of having to make a PR to chainlit to support their blob storage choice?

@sandangel
Copy link
Contributor

I also suggest using memcached or session storage, but this is a good first starting point already.

@hayescode
Copy link
Contributor Author

can we add an interface/adapter for blob storage client? For example: the client needs to implement upload_file, delete_file, download_file. then user can just have their own client and implement that interface instead of having to make a PR to chainlit to support their blob storage choice?

@sandangel I was thinking about make another file called chainlit/backend/data/blob.py or something where we could have classes for each blob storage provider. I only have access to Azure though, so cannot implement the others.

I added the functions based on the requirements from the chainlit documentation, except for delete_user_session because I cannot see a purpose. Where are you seeing delete_file and download_file?

@sandangel
Copy link
Contributor

sandangel commented Mar 27, 2024

@hayescode

Where are you seeing delete_file and download_file?

I think they are hidden in literalai API client. That is why I chose to implement the literalai API instead of BaseDataLayer in my MongoDB PR.

@sandangel
Copy link
Contributor

sandangel commented Mar 27, 2024

I was thinking about make another file called chainlit/backend/data/blob.py or something where we could have classes for each blob storage provider. I only have access to Azure though, so cannot implement the others.

I think this does not scale well for the reason I mentioned earlier:

having to make a PR to chainlit to support their blob storage choice

As the number of providers grow, it will add more maintenance burden for us. That is why I suggest having a common interface on the chainlit side, and on the users side they can use their own blob storage client. Similar to how chainlit is creating BaseDataLayer.

For SQLAlchemyDataLayer in this PR, I think it will only serve as an example implementation maintained by the community, not really something battle tested and ready to use for production that scale beyond > 100K users. For example, we will also need to add queue system for the write path, and key-value store for session and cache for faster querying and displaying data.

@hayescode
Copy link
Contributor Author

I think they are hidden in literalai API client.

I am not looking in literal because it's irrelevant for a custom data layer. Devs advise in the project ticket is to inherit from BaseDataLayer so that's what this implements.

That is why I suggest having a common interface on the chainlit side, and on the users side they can use their own blob storage client.

@sandangel I don't understand what you mean I guess. If you have code for this please share.

In any case, this isn't a fully feature complete PR and we should expect (and welcome!) more enhancements. I can only test some SQL dialects and storage providers. Tbh I am in no rush as I'm already live with this. Data persistence has blocked me and my team for months so I made this out of frustration mostly haha.

@sandangel
Copy link
Contributor

sandangel commented Mar 28, 2024

@hayescode sorry for the confusion. Here is some pseudocode:

# chainlit code

from typing import Protocol


class BlobStorageClient(Protocol):
    def upload_file(self, key: str, data: bytes) -> str:
        pass


class SQLAlchemyDataLayer(BaseDataLayer):

    async def add_blob_storage_client(self, blob_storage_client: BlobStorageClient) -> None:
        self.blob_storage_client = blob_storage_client
        logger.info("Blob Storage client initialized")


    @queue_until_user_message()
    async def create_element(self, element: 'Element'):
        # ...

        element.url = self.blob_storage_client.upload_file(key=element.key, data=element.data)


# user code:

class S3StorageClient:
    def upload_file(self, key: str, data: bytes) -> str:
        import boto3

        s3_client = boto3.client("s3")
        s3_client.put_object(Bucket="my-bucket", Key=key, Body=data)

        return f"s3://my-bucket/{key}"

cl._data_layer = SQLAlchemyDataLayer()
cl._data_layer.add_blob_storage_client(S3StorageClient())

@hayescode
Copy link
Contributor Author

@tpatel I ended up leveraging chainlit context by adding this to each of the functions to ensure DB writes only occur if a valid user is in context.

if not getattr(context.session.user, 'id', None):
    raise ValueError("No authenticated user in context")

If you know of a better way please let us know. If not I believe all actions have been addressed?

@tpatel
Copy link
Collaborator

tpatel commented Apr 10, 2024

@hayescode looks good, however I'm getting a Chainlit context not found because the POST /project/threads is called before the websocket session is established, so the context isn't defined yet.

I'm looking into how we could make this work.

@hayescode
Copy link
Contributor Author

@tpatel I removed the chainlit.context check from all except for create/upsert methods. This gives us the desired behavior where non-authenticated users cannot write to the DB, while maintaining functionality for authenticated users.

@tpatel
Copy link
Collaborator

tpatel commented Apr 10, 2024

Neat, it makes a ton of sense to remove the readonly checks because they are called from already-authenticated routes (so they won't be called if the user isn't correctly authenticated) 👍

Could we switch to a silent ignore (like return None) instead of raising an exception when getattr(context.session.user, 'id', None) is None? This would prevent folks who use Chainlit without authentication from having the exception stack traces in their logs.

async def delete_user_session(self, id: str) -> bool:
return False # Not sure why documentation wants this

async def get_all_user_threads(self, user_id: Optional[str] = None, thread_id: Optional[str] = None) -> Optional[List[ThreadDict]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how chainlit is using this function, but if they are using this for displaying the list of threads only, then I think we should not query all the information inside each thread. That should happen when user click to resume the chat.

Copy link
Contributor Author

@hayescode hayescode Apr 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function expects a List[ThreadDict] which is basically everything to your point. I was able to speed it up a lot by querying it all at once per user instead of recursively calling each thread, steps in that thread, etc to build it.

This is also used for the search and feedback filters.

I added a user_thread_limit to cap threads since it'll only grow. I have several users filling up my limit (100 threads) and performance is fine. Are you seeing performance problems or just asking the Chainlit devs why it's like this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function isn't part of BaseDataLayer so it can be defined and used from within this custom data layer as needed.

@sandangel I agree that there might be some performance improvements to be done, I think it's fine for the first release of this custom data layer 👍

Comment on lines 255 to 256
if not getattr(context.session.user, 'id', None):
raise ValueError("No authenticated user in context")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to remove this check. The upsert_feedback is only called from a route, so it's outside of the user session (so attempting to use context will always through).

I don't have a good solution to prevent adding feedback to the DB for non-authenticated chainlit apps for now.

Suggested change
if not getattr(context.session.user, 'id', None):
raise ValueError("No authenticated user in context")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we're overthinking this. I don't see a scenario where authenticated and non-authenticated users coexist in the same app/db. With authentication enabled users can't use my app with authentication. If someone doesn't have authentication set up I don't know why they'd set up a custom data layer.

I just don't see how this scenario we're trying to guard against would be possible, but maybe I'm missing something?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is a edge edge case I agree. I can sadly think about a least one scenario where it could happen: a app owner disables the authentication for a few weeks.
To be fair this is the only thing that I've found in today's testing session, and getting this fixed would enable me to do a last code review / testing session.

An option that I've just though about is to use the require_login method instead of checking the context (

def require_login():
return (
bool(os.environ.get("CHAINLIT_CUSTOM_AUTH"))
or config.code.password_auth_callback is not None
or config.code.header_auth_callback is not None
or is_oauth_enabled()
)
).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea! Then we can just block initialzation of the custom data layer itself if no authentication. Great solution!

Where would we call require_login()? I've tried the app.py before the decorators and in sqlalchemy.py but it is returning None for me even when I do have authentication enabled.

from chainlit.auth import require_login
is_authentication_enabled = require_login()

class SQLAlchemyDataLayer(BaseDataLayer):
    def __init__(self, conninfo: str, ssl_require: bool = False, storage_provider: Optional[BaseStorageClient] = None, user_thread_limit: Optional[int] = 1000):
        if not is_authentication_enabled:
            print(f'is_authentication_enabled: {is_authentication_enabled}')
            raise PermissionError("Authentication is required to use SQLAlchemyDataLayer")

@tpatel tpatel self-requested a review April 11, 2024 17:02
@intc-hharshtk
Copy link

Is it possible to continue from previous thread?

Copy link
Collaborator

@tpatel tpatel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✨✨✨ AWESOME ✨✨✨

Thanks for your work @hayescode ! I believe this feature is ready now, we can still disable the persistence and display a proper warning for chainlit apps without authentication in a future PR.

@tpatel tpatel merged commit 2cd87ec into Chainlit:main Apr 15, 2024
4 checks passed
@hayescode
Copy link
Contributor Author

@tpatel thank you very much for your assistance and advice throughout this process! I'm excited to see how the Chainlit community evolved this functionality!

tpatel pushed a commit to tpatel/chainlit that referenced this pull request Apr 15, 2024
- adds custom, direct database, data layer using SQLAlchemy with support for a wide-range of SQL dialects
- configures ADLS or S3 as the blob storage provider
- duplicated `PageInfo` and `PaginatedResponse` from literal SDK into backend/chainlit/types.py and updated typing
@JeanRessouche
Copy link

Hi, it seems that this implementation does not support Azure Sql Databases: i get no error at boot but a Bad Request: 'User not persisted' 400 error, while the user is correctly stored in the user table. Also, on thread creation, an error appears to store thread.tags because this datatype (TEXT[]) does not exist in Azure Sql.

from sqlalchemy import create_engine
import urllib
import chainlit.data as cl_data
from chainlit.data.sql_alchemy import SQLAlchemyDataLayer
from sqlalchemy.engine import URL

driver = 'ODBC Driver 17 for SQL Server'

params = urllib.parse.quote_plus(
    'Driver=%s;' % driver +
    'Server=tcp:%s,1433;' % os.environ["AZURE_DB_HOST"] +
    'Database=%s;' % os.environ["AZURE_DB_DATABASE"] +
    'Uid=%s;' % os.environ["AZURE_DB_USERNAME"] +
    'Pwd={%s};' % os.environ["AZURE_DB_PASSWORD"] +
    'Persist Security Info=False;' +
    'MultipleActiveResultSets=False;' +
    'Encrypt=yes;' +
    'TrustServerCertificate=no;' +
    'Connection Timeout=30;')

conn_str = 'mssql+pyodbc:///?odbc_connect={}'.format(params)
engine_azure = create_engine(conn_str)
engine_azure.connect()
print('Connection to the Azure Db is ok')

connection_url = URL.create(
    drivername='mssql+aioodbc',
    username=os.environ["AZURE_DB_USERNAME"],
    password=os.environ["AZURE_DB_PASSWORD"],
    host=os.environ["AZURE_DB_HOST"],
    database=os.environ["AZURE_DB_DATABASE"],
    query={
        'driver': driver,
        'Encrypt': 'yes',
        'TrustServerCertificate': 'no',
        'Connection Timeout': '30'
    }
)

cl_data._data_layer = SQLAlchemyDataLayer(
    conninfo=connection_url,
    ssl_require=True,
    storage_provider=engine_azure,
    user_thread_limit=100)

Did i made a mistake somewhere ?

@tpatel
Copy link
Collaborator

tpatel commented Apr 16, 2024

@JeanRessouche I've added some docs yesterday to clarify the scope of this release. For now this has only been tested with PostgreSQL.

I'll be happy to guide you if you choose to add support for Azure SQL!

@JeanRessouche
Copy link

JeanRessouche commented Apr 16, 2024

Thanks a lot @tpatel, the doc is helping but it's still a little bit cloudy for me.

Adding support for Azure Sql is definitely something that i'm willing to do, but not in the short term, so I'm trying the Azure Datalake way.

Based on the doc, i'm facing a blocker with the conninfo variable.
For me an Azure Datalake Gen 2 is basically a storage account (that can provide with the account name & sas token), so i have no idea how to get the conninfo content that look like sql server details here. Does it require an Azure Synapse on top of the Data lake gen 2 ?

[EDIT] Yeah, i certainly need Synapse, configured one, now i'm lost with the incompatible ddl as the datalake link seems to be SQL server compatible.

@hayescode
Copy link
Contributor Author

For the tags I'm not sure what that azure SQL equivalent is but it's a list of strings in that column.

For the data lake I am using ADLS gen2. User not persisted sounds like this isn't getting configured properly. Pass the account url and credential (managed identity or access key). Try a test script to write some test data.

@JeanRessouche
Copy link

I'm curious about how you created the DDL in ADLS gen 2 on your side, i was only able to connect to it with sql server driver, thus the postgre ddl won't work.

So far i was only able to make it work properly with a postgre server (which is great already!), failed with ADSL gen2 & Azure Sql database.

@hayescode
Copy link
Contributor Author

I'm curious about how you created the DDL in ADLS gen 2 on your side

I'm not sure what you mean, there is no DDL for ADLS. All you need is the account_url, credential, and the container name. Optionally add sas_token to append to the url to give users access. Once this is provided the code should handle it all and log any errors.

@JeanRessouche
Copy link

Hum, ok, i have to retry then, did that but it wasn't working, certainly because i did not figure out what to put in conninfo when we use ADLS.

@hayescode
Copy link
Contributor Author

you don't put it in the conninfo you instantiate the ADLS class and pass it storage_provider.

from chainlit.data.storage_clients import AzureStorageClient

storage_client = AzureStorageClient(
    account_url="https://<your_account>.dfs.core.windows.net",
    container="<your_container>",
    credential=credential,
    sas_token=ADLS_SAS_TOKEN
    )
cl_data._data_layer = SQLAlchemyDataLayer(
    conninfo=SQLALCHEMY_CONNINFO,
    ssl_require=True,
    storage_provider=storage_client,
    user_thread_limit=100)

@JeanRessouche
Copy link

Hum, thanks, but there is still something i'm missing here, you ask me not to set conninfo but you do set it in your answer, and in the description above.

image

In the other hand if i simply don't provide it or set it empty i get an error as it's a required parameter

image

For ADLS i have no clue what is expected in the SQLALCHEMY_CONNINFO variable.

<dialect>+<driver>://<user>:<password>:<port>/<database>

@hayescode
Copy link
Contributor Author

There are 2 clients. Pass the ALDS client to the SQLAlchemy via storage_client. I think you're conflating these.

@Reymond190
Copy link

In next commits, pls fix this ->
i tested the sql alchemy datalayer using azure AD Oauth. there is an validation error when whole dict is used in "PersistedUser(**user_data)", works after metadata is removed from the dict.

ERROR:
File "D:\git_projects\mvenv_504\lib\site-packages\chainlit\data\sql_alchemy.py", line 114, in get_user
return PersistedUser(**user_data)
File "D:\git_projects\mvenv_504\lib\site-packages\pydantic_internal_dataclasses.py", line 140, in init
s.pydantic_validator.validate_python(ArgsKwargs(args, kwargs), self_instance=s)
pydantic_core._pydantic_core.ValidationError: 1 validation error for PersistedUser
metadata
Input should be a valid dictionary [type=dict_type, input_value='{"image": "data:image/jp..."provider": "azure-ad"}', input_type=str]
For further information visit https://errors.pydantic.dev/2.7/v/dict_type

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Development

Successfully merging this pull request may close these issues.

None yet

8 participants