forked from Chainlit/chainlit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
storage_clients.py
58 lines (52 loc) · 2.94 KB
/
storage_clients.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
from chainlit.data import BaseStorageClient
from chainlit.logger import logger
from typing import TYPE_CHECKING, Optional, Dict, Union, Any
from azure.storage.filedatalake import DataLakeServiceClient, FileSystemClient, DataLakeFileClient, ContentSettings
import boto3 # type: ignore
if TYPE_CHECKING:
from azure.core.credentials import AzureNamedKeyCredential, AzureSasCredential, TokenCredential
class AzureStorageClient(BaseStorageClient):
"""
Class to enable Azure Data Lake Storage (ADLS) Gen2
parms:
account_url: "https://<your_account>.dfs.core.windows.net"
credential: Access credential (AzureKeyCredential)
sas_token: Optionally include SAS token to append to urls
"""
def __init__(self, account_url: str, container: str, credential: Optional[Union[str, Dict[str, str], "AzureNamedKeyCredential", "AzureSasCredential", "TokenCredential"]], sas_token: Optional[str] = None):
try:
self.data_lake_client = DataLakeServiceClient(account_url=account_url, credential=credential)
self.container_client: FileSystemClient = self.data_lake_client.get_file_system_client(file_system=container)
self.sas_token = sas_token
logger.info("AzureStorageClient initialized")
except Exception as e:
logger.warn(f"AzureStorageClient initialization error: {e}")
async def upload_file(self, object_key: str, data: Union[bytes, str], mime: str = 'application/octet-stream', overwrite: bool = True) -> Dict[str, Any]:
try:
file_client: DataLakeFileClient = self.container_client.get_file_client(object_key)
content_settings = ContentSettings(content_type=mime)
file_client.upload_data(data, overwrite=overwrite, content_settings=content_settings)
url = f"{file_client.url}{self.sas_token}" if self.sas_token else file_client.url
return {"object_key": object_key, "url": url}
except Exception as e:
logger.warn(f"AzureStorageClient, upload_file error: {e}")
return {}
class S3StorageClient(BaseStorageClient):
"""
Class to enable Amazon S3 storage provider
"""
def __init__(self, bucket: str):
try:
self.bucket = bucket
self.client = boto3.client("s3")
logger.info("S3StorageClient initialized")
except Exception as e:
logger.warn(f"S3StorageClient initialization error: {e}")
async def upload_file(self, object_key: str, data: Union[bytes, str], mime: str = 'application/octet-stream', overwrite: bool = True) -> Dict[str, Any]:
try:
self.client.put_object(Bucket=self.bucket, Key=object_key, Body=data, ContentType=mime)
url = f"https://{self.bucket}.s3.amazonaws.com/{object_key}"
return {"object_key": object_key, "url": url}
except Exception as e:
logger.warn(f"S3StorageClient, upload_file error: {e}")
return {}