Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Initial Value to Subject using start_with doesn't work for subscribe with AsyncIOScheduler #579

Open
lamkenn opened this issue Aug 12, 2021 · 1 comment
Labels

Comments

@lamkenn
Copy link

lamkenn commented Aug 12, 2021

Hi

I am trying to combine Subject with start_with operator as I want the each observer to get the initial image using start_with and the subsequence update via Subject on_next.

However, i am not getting the expected result:

import asyncio
from rx.scheduler.eventloop import AsyncIOScheduler
from rx.subject import Subject
import rx.operators as ops
loop = asyncio.get_event_loop()
aio_scheduler = AsyncIOScheduler(loop=loop)

subject_test = Subject()
sub_obs = subject_test.pipe(ops.start_with(999))
sub_obs.subscribe(lambda x: print(f'subject1: {x}'), scheduler=aio_scheduler)
subject_test.on_next(456)
loop.run_forever()

Output:
subject1: 999

Expected Output:
subject1: 999
subject1: 456

If I omit scheduler=aio_scheduler from subscribe(), i got the expected output correctly.
From what i found, if i use AsyncIOScheduler, sub_obs fails to add the observers to the list during the subscribe().

@lamkenn lamkenn changed the title Adding Initial Value to Subject using start_with doesn't work with AsyncIOScheduler Adding Initial Value to Subject using start_with doesn't work for subscribe with AsyncIOScheduler Aug 12, 2021
@MainRo
Copy link
Collaborator

MainRo commented Aug 28, 2021

Item 456 is not emitted because it is pushed on the subject before the subscription happened. You either need to delay the emission, or use a ReplaySubject so that the items are buffered until the subscription to the subject is done:

import asyncio
from rx.scheduler.eventloop import AsyncIOScheduler
from rx.subject import ReplaySubject
import rx.operators as ops
loop = asyncio.get_event_loop()
aio_scheduler = AsyncIOScheduler(loop=loop)

subject_test = ReplaySubject()
sub_obs = subject_test.pipe(ops.start_with(999))
sub_obs.subscribe(lambda x: print(f'subject1: {x}'), scheduler=aio_scheduler)
subject_test.on_next(456)
loop.run_forever()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants