Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 - get_object - Read doesn't work #991

Open
6 tasks done
Sharleedah opened this issue Jan 24, 2023 · 12 comments
Open
6 tasks done

S3 - get_object - Read doesn't work #991

Sharleedah opened this issue Jan 24, 2023 · 12 comments
Labels

Comments

@Sharleedah
Copy link

Describe the bug
I wanted to move to aiobotocore for my aws handling but unfortunately i am running in the following error:

UseCase: I want to fetch a object from S3 with the following code

session = get_session()
async with session.create_client('s3', region_name="eu-central-1") as client:
    try:
        obj = await client.get_object(Bucket=cls.bucket_name, Key=path)
    except ClientError as ex:
        if ex.response['Error']['Code'] != 'NoSuchKey': raise ex 
        raise ObjectMissingError("Template", path)
    finally:
        await client.close()

From the given obj i want to read the data and return it on my webserver with the following Code:

@user_router.get("/storage/{filename}", responses=response_get_file, summary="Get file")
async def get_file(filename:str = Path(..., description="Name of the file to be returned", example="test.jpg")):
    """Get file"""
    content = await AmazonStorageHandler.get_file(path=filename)
    return StreamingResponse(content= content["Body"].iter_chunks(), media_type=content["ContentType"])

Problem is, that the webserver stops replying and seems to be stuck in a inifinite while loop. The following code yields the same behaviour, hence its not sth the webserver is responsible for:

data=await obj.read()

Checklist

  • I have reproduced in environment where pip check passes without errors
  • I have provided pip freeze results
  • I have provided sample code or detailed way to reproduce
  • I have tried the same code in botocore to ensure this is an aiobotocore specific issue
  • I have tried similar code in aiohttp to ensure this is is an aiobotocore specific issue
  • I have checked the latest and older versions of aiobotocore/aiohttp/python to see if this is a regression / injection

pip freeze results

aioboto3==10.3.0
aiobotocore==2.4.1
aiofiles==22.1.0
aiohttp==3.8.3
aioitertools==0.11.0
aiosignal==1.3.1
aiosmtplib==2.0.1
aiosqlite==0.18.0
alembic==1.9.2
amqp==5.1.1
anyio==3.6.2
async-timeout==4.0.2
asyncpg==0.27.0
attrs==22.2.0
bcrypt==4.0.1
beautifulsoup4==4.11.1
billiard==3.6.4.0
boto3==1.24.59
botocore==1.27.59
celery==5.2.7
certifi==2022.12.7
charset-normalizer==2.1.1
click==8.1.3
click-didyoumean==0.3.0
click-plugins==1.1.1
click-repl==0.2.0
dnspython==2.3.0
email-validator==1.3.1
eventlet==0.33.3
fastapi==0.89.1
ffmpeg-python==0.2.0
frozenlist==1.3.3
future==0.18.3
greenlet==2.0.1
gunicorn==20.1.0
h11==0.14.0
html-sanitizer==1.9.3
idna==3.4
itsdangerous==2.1.2
Jinja2==3.1.2
jmespath==1.0.1
kombu==5.2.4
lorem==0.1.1
lxml==4.9.2
Mako==1.2.4
Markdown==3.4.1
MarkupSafe==2.1.2
multidict==6.0.4
passlib==1.7.4
Pillow==9.4.0
prompt-toolkit==3.0.36
psycopg2==2.9.5
pycountry==22.3.5
pydantic==1.10.4
PyJWT==2.6.0
python-dateutil==2.8.2
python-dotenv==0.21.1
python-multipart==0.0.5
pytz==2022.7.1
PyYAML==6.0
qrcode==7.3.1
redis==4.4.2
requests==2.28.2
s3transfer==0.6.0
six==1.16.0
sniffio==1.3.0
soupsieve==2.3.2.post1
SQLAlchemy==1.4.46
starlette==0.22.0
typing_extensions==4.4.0
urllib3==1.26.14
uvicorn==0.20.0
vine==5.0.0
wcwidth==0.2.6
wrapt==1.14.1
yarl==1.8.2

Environment:

  • Python Version: [e.g. 3.10]
  • Docker: python:3.10-slim
@thehesiod
Copy link
Collaborator

  1. tip: should use except client.exceptions.NoSuchKey:
  2. you can't close the aiobotocore client until after the response has fully streamed to the client. client should be on the server, and not per response. Remember each client has by default 10 connectors that can be used in parallel...this can be configured via the Config object.
  3. I think you just want StreamingResponse(content= content["Body"].content, .... as content is a "payload" IIRC.

@Sharleedah
Copy link
Author

thanks for your answer! You are absolutely right, about the second one. But are you sure about the third one? Shouldnt be the data chunked in order to use a StreamResponse? Ive tried it with:

StreamingResponse(content=content["Body"], ... # loads only first chunk and then seems to be stuck in while loop
StreamingResponse(content=content["Body"].content, ... # Problem: no iterator
StreamingResponse(content=content["Body"].iter_chunks(),... # loads only first chunk and then seems to be stuck in while loop

aswell as writing my own generator like this

async def stream():
   async for chunk in content["Body"]:
      yield chunk
return StreamingResponse(content=sream(),...

But i got it to work like this (which is not a valid solution in the end cause it reads everything first):

@user_router.get("/storage/{filename}", responses=response_get_file, summary="Get file")
async def get_file(filename:str = Path(..., description="Name of the file to be returned", example="test.png")):
    """Get file"""
    content = await AmazonStorageHandler.get_file(path=filename)
    data = await content["Body"].read()
    return Response(content=data, media_type=content["ContentType"])

Do you have any idea how i can make it work with a Streaming Response if using a generator doesn't seem to work on the objects i receive from aiobotocore?

@thehesiod
Copy link
Collaborator

whoops ya, looking again, StreamResponse wants you to write the data bit by bit so that it streams back to the client while you write. If you look at aiohttp.web_response.Response, on the body setter it checks to see if the body is of a payload type. IIRC .content is a payload type, so you could do something like response = Response(...), followed by response.body = content["Body"].content and I think it should allow you to stream the payload back to the client

@thehesiod
Copy link
Collaborator

do report back as I'd like to keep that trick in my back pocket :)

@Sharleedah
Copy link
Author

Thanks again for the fast response! Ive tried the aiohttp.web_response.Response with content["Body"].content, but unfortunately it didn't work (Client only sent 630 Bytes ish instead of the 90 KB the file had for example).

Do you have any idea how to make StreamResponse work? Normally it should work with .iter_chunks() as it did already with botocore native.

@thehesiod
Copy link
Collaborator

will try to check on this again asap

@tw4l
Copy link

tw4l commented Mar 30, 2023

I believe that I've hit this same issue attempting to stream the partial contents of a file with aibotocore. My use case is that I am extracting a file from an uncompressed zip stored in an S3 bucket (testing using a local Minio container) via a range request. I get the full content of the file when downloading it all into memory as in the following example snippet:

response = await client.get_object(
        Bucket=bucket, Key=key, Range=f"bytes={start}-{end}"
    )
content = await response["Body"].read()
...

However this is inefficient for large files and so I would like to be able to stream the contents of the response body via an async generator.

If I change the above to:

response = await client.get_object(
        Bucket=bucket, Key=key, Range=f"bytes={start}-{end}"
    )
stream = response["Body"]
return stream_generator(stream)

async def stream_generator(stream):
    async for line in stream.iter_lines():
        parsed_line = do_stuff_to_line(line)
        yield parsed_line

I get the first ~60-100 lines of the file (out of ~2800) and then the stream suddenly stops. Same thing if I switch to using stream.iter_chunks() - with default chunk_size I see it receive about 10 chunks, then a partial chunk, then nothing.

In my case this async generator is then streamed via a fastapi.responses.StreamingResponse, but the problem seems to occur before that point and I can reproduce without fastapi.

tw4l added a commit to webrecorder/browsertrix that referenced this issue Mar 30, 2023
Not working due to what appears to be an aibotocore bug - see:
aio-libs/aiobotocore#991
tw4l added a commit to webrecorder/browsertrix that referenced this issue Mar 30, 2023
If a crawl is completed, the endpoint streams the logs from the log
files in all of the created WACZ files, sorted by timestamp.

The API endpoint supports filtering by log_level and context whether
the crawl is still running or not.

This is not yet proper streaming because the entire log file is read
into memory before being streamed to the client. We will want to
switch to proper streaming eventually, but are currently blocked by
an aiobotocore bug - see:

aio-libs/aiobotocore#991
tw4l added a commit to webrecorder/browsertrix that referenced this issue Mar 30, 2023
If a crawl is completed, the endpoint streams the logs from the log
files in all of the created WACZ files, sorted by timestamp.

The API endpoint supports filtering by log_level and context whether
the crawl is still running or not.

This is not yet proper streaming because the entire log file is read
into memory before being streamed to the client. We will want to
switch to proper streaming eventually, but are currently blocked by
an aiobotocore bug - see:

aio-libs/aiobotocore#991
tw4l added a commit to webrecorder/browsertrix that referenced this issue Apr 11, 2023
If a crawl is completed, the endpoint streams the logs from the log
files in all of the created WACZ files, sorted by timestamp.

The API endpoint supports filtering by log_level and context whether
the crawl is still running or not.

This is not yet proper streaming because the entire log file is read
into memory before being streamed to the client. We will want to
switch to proper streaming eventually, but are currently blocked by
an aiobotocore bug - see:

aio-libs/aiobotocore#991
tw4l added a commit to webrecorder/browsertrix that referenced this issue Apr 11, 2023
If a crawl is completed, the endpoint streams the logs from the log
files in all of the created WACZ files, sorted by timestamp.

The API endpoint supports filtering by log_level and context whether
the crawl is still running or not.

This is not yet proper streaming because the entire log file is read
into memory before being streamed to the client. We will want to
switch to proper streaming eventually, but are currently blocked by
an aiobotocore bug - see:

aio-libs/aiobotocore#991
@thehesiod
Copy link
Collaborator

anyone have a sec to write a server/client demo in one python file? If not I'll take some time to write one to help debug soon

@katichev
Copy link

Seems that we've faced the same issue trying to stream contents of a response body with aiohttp to another service. Under the hood our code does following:

# create a client instance on app startup
# ...
response = await client.get_object(Bucket=bucket, Key=key)
content = response['Body']

async with aiohttp.ClientSession() as session:
    async with session.post('http://downstream', data=content) as resp:
        # process response

Sometimes the client just stops working and hangs until the app is restarted.

The problem is that StreamingBody actually wraps ClientResponse and the latter should be explicitly closed at the end to free underlying connection. It is stated in README.md, however this might be unclear for the cases when AsyncIterator protocol is applied to the response body.

In my case the code above may be modified as following:

async with content:    # <- enter underlying ClientResponse cm
    async with aiohttp.ClientSession() as session:    
        async with session.post('http://...', data=content) as resp:
            # process response

Hope this helps!

@thehesiod
Copy link
Collaborator

technically it's supposed to GC the connection when it's no longer referenced, so there may be a bug in aiohttp

@katichev
Copy link

UPD I created a small test server which ignores response body. For every request:

  1. call get_object for some huge file
  2. put StreamingBody.__wrapped__ into WeakValueDictionary in global scope.
  3. respond with 200 ok

After it gets stuck I check my weakref dict with objgraph.
That's what it shows:

refs

As far as I can see it cannot be GC-ed, right?

aiobotocore==2.6.0, aiohttp==3.8.5, python3.11

@thehesiod
Copy link
Collaborator

btw in the original code you're closing the client, you can't close the client if you're going to stream the object back out. Also don't need to call close on the client as you're already using the context manager. Could you update your example please

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants