Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sample does not appear to emit on_completed #645

Open
markovejnovic opened this issue May 16, 2022 · 3 comments
Open

Sample does not appear to emit on_completed #645

markovejnovic opened this issue May 16, 2022 · 3 comments

Comments

@markovejnovic
Copy link

markovejnovic commented May 16, 2022

Describe the bug
It appears to me that running operators.sample prevents the on_completed event from getting fired, while on_error works as expected.

To Reproduce
Steps to reproduce the behavior:

  1. Create a subject that is fed with a custom object periodically.
  2. Subscribe to the subject with a operators.sample filter.
  3. Call on_completed
  4. Notice it doesn't get emitted to the subscribed observer.

Expected behavior
on_completed is correctly called.

Code or Screenshots

subject = rx.Subject()
def feed_subject(subject: rx.Subject):
    for i in range(100):
        subject.on_next(f'Hello World {i}')
        time.sleep(1e-2)
provider_thread = threading.Thread(target=feed_subject, args=(subject, ))
window = ObserverMainWindow(test_view, subject)

provider_thread.start()
provider_thread.join()

subject.on_completed()

ObserverMainWindow is a function that looks something like:

def ObserverMainWindow(child, data: rx.Observable):
    # ...

    def rerender_window(state):
        # ...

    def close():  # Never fired
        print(f'closing', file=open('test.txt', '+a'))
        # ...

    def on_err(error):  # Correctly fired if `on_error` is called.
        print('closing error', file=open('test.txt', '+a'))
        # ...

    # This does not work
    data.pipe(rxops.sample(1 / 60)).subscribe(
        rerender_window, on_err, close)
    # This for example works as expected:
    # data.pipe(rxops.map(lambda x: x)).subscribe(
    #   rerender_window, on_err, close)

Additional context

I apologize in advance if I'm missing something obvious.

  • OS [e.g. Windows]: Fedora 36
  • RxPY version [e.g 4.0.0]: 4.0.0
  • Python version [e.g. 3.10.2]: 3.10.4

Edits

  1. Added an example of what works
  2. Formatting
  3. Typos
@dbrattli
Copy link
Collaborator

Please submit a minimal self contained code example. Minimal means that there should not be any other code. E.g what is test_view, child. Self-contained means that one should be able to copy the code and run it without having to guess which imports are being used.

@matiboy
Copy link
Collaborator

matiboy commented Dec 22, 2022

Looks like you're not giving the sample the time to emit because main thread dies.
Below is a simplified version to reproduce. Change the last time.sleep's value to something lower than the sample duration and you will not see "I am complete" but if you put a value like say 0.1 then you see "I am complete".

import reactivex
from reactivex import operators

subject = reactivex.Subject()
subject.pipe(
    operators.sample(2.5*1e-2)
).subscribe(
    on_next=lambda x: print('NEXT', x),
    on_completed=lambda: print('I am complete')
)

# this has no effect on the story
def feed_subject(s: reactivex.Subject):
    for i in range(8):
        s.on_next(f'Hello World {i}')
        time.sleep(1e-2)

feed_subject(subject)
subject.on_completed()
time.sleep(1e-3) # <--- change this to 0.1 and you'll see the on_completed

The reason for this is that sample only emits on_complete on the next sample iteration. You can see this in the below test (which passes); I skipped all the emitting part, just completed the cold observable at 6.5seconds. As you can see the emit only happens at 8=0.5 (subscription starts) + 2.5 + 2.5 + 2.5 <- next sample, not at 0.5+6.5

def test_issue_645_reproduce():
    scheduler = TestScheduler()
    subject = reactivex.Subject()
    scheduler.schedule_relative(6.5, lambda *_: subject.on_completed())

    results = scheduler.start(
        lambda: subject.pipe(
            operators.sample(2.5)
        ), created=0.01, subscribed=0.5, disposed=20
    )
    assert results.messages == [
        on_completed(8)
    ]

Tbh I don't know if that's supposed to be the case though. Or whether on source completion, sample should emit completion immediately.

@matiboy
Copy link
Collaborator

matiboy commented Jan 1, 2023

Following up on this, looking at RxJS specs it looks like the expected behaviour is to complete if source completes and even drop any value during the last (incomplete) sample.

I've reproduced the test and it indeed fails.

@dbrattli I would think this is a bug, or at least an inconsistency with RxJS (sorry that's my "reference" library, can't really find the tests in other languages)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants