Replies: 3 comments 1 reply
-
I notice your using a sync client on an async endpoint, could you test with an async client ? |
Beta Was this translation helpful? Give feedback.
0 replies
-
As mentioned, same behaviour for sync test as for async test, but find the test code below import asyncio
import sys
from collections.abc import AsyncIterator
import pytest
from litestar.channels import ChannelsPlugin
from litestar.testing import AsyncTestClient, TestClient
from httpx_sse import aconnect_sse, EventSource, connect_sse
from app import app
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
pytestmark = pytest.mark.anyio
@pytest.fixture
def anyio_backend() -> str:
return "asyncio"
@pytest.fixture(name="client")
async def fx_client() -> AsyncIterator[AsyncTestClient]:
async with AsyncTestClient(app=app) as client:
yield client
@pytest.fixture(name="channels")
def fx_channels(client: AsyncTestClient) -> ChannelsPlugin:
return client.app.plugins.get(ChannelsPlugin)
async def test_example_sse(client: AsyncTestClient):
async with aconnect_sse(client, "GET", f"{client.base_url}/example-sse") as event_source:
events = [sse async for sse in event_source.aiter_sse()]
assert len(events) == 5
for idx, sse in enumerate(events, start=1):
assert sse.event == "special"
assert sse.data == str(idx)
assert sse.id == "123"
assert sse.retry == 1000
async def test_notification(client: AsyncTestClient, channels: ChannelsPlugin):
topic = "demo"
message = {"hello": "world"}
async with aconnect_sse(
client,
"GET",
f"{client.base_url}/{topic}/notification",
timeout=1
) as event_source:
channels.publish(message, [topic])
received = anext(event_source.aiter_sse())
print(received) |
Beta Was this translation helpful? Give feedback.
0 replies
-
spent a little time on this, and yeah it's blocking somewhere in the channels publishing stack, couldnt understand why though |
Beta Was this translation helpful? Give feedback.
1 reply
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Hi, I am a bit stuck and maybe I'm just missing the obvious.
For this I used:
I have an SSE endpoint which shall forward data from a ChannelPlugin topic. In the example below, I use another endpoint to publish on this topic but this is mostly for demonstrations sake when testing it with an httpx script.
I also included the unit test from https://github.com/litestar-org/litestar/blob/main/tests/unit/test_response/test_sse.py#L16 just to check that I'm not crazy. Anyway.
The SSE endpoint implementation follows https://github.com/orgs/litestar-org/discussions/3104#discussioncomment-8453195
When running the script for testing it works as expected:
However - I did observe that there is a delay on the SSE endpoint. It takes about 3 seconds for the subscriber to connect to the SSE endpoint and while this happens other requests are also blocked, e.g., the POST request only returns AFTER the SSE connection has been established.
But at least it works... when I run pytest, it just gets stuck and never finishes. I used the debugger to figure out where it gets stuck and its deep in the httpx code where it waits on
transport.handle_request(request)
and never returns. So, it might be a question for thehttpx
repo. I also tested this with an async pytest set up. I also tested it by using threading or asyncio tasks to somehow circumvent the "getting stuck" thing - but neither worksI was also wondering if I'm just doing something fundamentally wrong with setting up the SSE endpoint. I just find it weird that it DOES work without pytest, even though it has this delay. I am aware that
subscriber.iter_events()
just blocks when there is nothing to retrieve but since it DOES work with the standalone script I had expected that it also somehow would work with pytest.App
Pytest
Script for testing
Beta Was this translation helpful? Give feedback.
All reactions