Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to on_next from background thread in main thread? #667

Open
alek5k opened this issue Nov 18, 2022 · 3 comments
Open

How to on_next from background thread in main thread? #667

alek5k opened this issue Nov 18, 2022 · 3 comments

Comments

@alek5k
Copy link

alek5k commented Nov 18, 2022

Hi, I want to pass data from a background thread into the main thread. I have the following code, but it seems that CurrentThreadScheduler() is not doing what I expect.

I'm using reactivex version 4.2.

from reactivex import operators as ops
from reactivex.scheduler import CurrentThreadScheduler, ThreadPoolScheduler
from reactivex import Subject
import threading
from threading import Thread

class MyThread(Thread):
    def __init__(self, callback):
        Thread.__init__(self)
        self.callback = callback

    def run(self):
        self.callback("hello")

my_subject = Subject()

def callback(data):
    ''' This is called in a separate thread '''
    print(f"In callback: {threading.current_thread().name}")
    my_subject.on_next(data)

if __name__ == "__main__":

    thread_to_execute_on = CurrentThreadScheduler()
    # thread_to_execute_on = ThreadPoolScheduler(max_workers=1)

    print(f"Before stream: {threading.current_thread().name}")
    
    background_thread = MyThread(callback=callback)
    
    my_subject.pipe(
        ops.observe_on(thread_to_execute_on)
    ).subscribe(
        lambda data: print(f"In subscription: {threading.current_thread().name}")
    )

    background_thread.start()
    
    input()

The output is:

Before stream: MainThread
In callback: Thread-7
In subscription: Thread-7

I've also tried using ThreadPoolScheduler and the data is correctly passed to the threadpool thread. In that scenario, output is:

Before stream: MainThread
In callback: Thread-7
In subscription: ThreadPoolExecutor-0_0

Is there something I can use to schedule work back on the main thread? for example: ops.observe_on(MainThreadScheduler()). This seems to be quite simple to do in C# and java.

To be clear, the output I am after is:

Before stream: MainThread
In callback: Thread-7
In subscription: MainThread
@matiboy
Copy link
Collaborator

matiboy commented Jan 6, 2023

Hi @alek5k

Looks like the issue is specific to the Subject here;

Below repro is actually not relevant as all examples that show "MainThread" are actually calling on_next from the main thread; sorry please ignore

thread_to_execute_on = CurrentThreadScheduler()
def print_thread(prefix: str):
    return lambda *x: print(prefix, threading.current_thread().name, x)
of(1, 2, 3).pipe(ops.observe_on(thread_to_execute_on)).subscribe(print_thread("OF")) 
# => OF MainThread (1,) ; OF MainThread (2,) ; OF MainThread (3,)
timer(2).subscribe(on_next=print_thread("TIMER"), scheduler=thread_to_execute_on) 
# => Do not schedule blocking work! <- warning ; TIMER MainThread (0,)
my_subject = Subject()
my_subject.pipe(ops.observe_on(thread_to_execute_on)).subscribe(print_thread('SUBJECT')) 
# => SUBJECT Thread-1 ('hello',)

That does appear to be a bug; will look into it.

@matiboy
Copy link
Collaborator

matiboy commented Jan 6, 2023

Sorry my comment above was not a valid repro at all, apologies.
After looking at it again, when calling on_next from another thread, the thread used by CurrentThreadScheduler() will indeed be that other thread, not the one where CurrentThreadScheduler was created.

This is actually stated in the doc and can be seen in the code:

Each instance manages a number of trampolines (and queues), one for each thread that calls a schedule method

and code where a new trampoline is created depending on the current_thread upon calling schedule, not upon creation.

def get_trampoline(self) -> Trampoline:
        thread = current_thread()
        tramp = self._tramps.get(thread)
        if tramp is None:
            tramp = Trampoline()
            self._tramps[thread] = tramp
        return tramp

Will investigate further how to achieve what you were asking for.

@alek5k
Copy link
Author

alek5k commented Jan 7, 2023

Thanks @matiboy , actually I forgot that I have raised a similar issue in the past at this issue.

I guess I didn't really get to a solution. The ability to do work on the main thread is pretty simple in C# RX and it would definitely be nice to have in RxPY.

A lot of the responses seem to assume that you have a UI event loop, which is not always the case, for example, imagine a while loop in the main thread which waits on a user input(), like a CLI.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants