Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connectable observable sends events multiple times #591

Open
igordertigor opened this issue Dec 3, 2021 · 2 comments
Open

Connectable observable sends events multiple times #591

igordertigor opened this issue Dec 3, 2021 · 2 comments
Labels

Comments

@igordertigor
Copy link

igordertigor commented Dec 3, 2021

I'm using rxpy for a realtime audio processing tool. The tool receives two event streams, one that contains audio chunks and one that contains small text like annotation snippets. The flow looks similar to this:

audio ----> .publish() ----> rec ----> p1 ---> p2 ---> p 3 ---> p4 ---> merge ---> output
                                        \              /               /
                                          ---> p5 ----                /
annotations --------------------------------------------------> p6 ---

Here, the p<x> nodes are processing nodes (implemented in numpy/pytorch) and the rec node is a recorder, that writes its input to disk and otherwise passes it on unchanged. I'm using the .publish() call to support the branching that comes after the rec node.

When I open the audio file written by the rec node, every chunk has been written 3 times, which implies that the rec node received every chunk 3 times. Is this intended behaviour? How can I avoid this? I'm worried that downstream nodes (p1-p5) might receive multiple repetitions of the same chunk as well and therefore might not operate as intended. However, the pipeline as a whole seems to work correctly.

I tried a number of variations:

  1. No .publish() call: Events in the first pipeline get stuck right before p3. p4 never receives any events.
  2. Introduce a separate step before the rec node that drops events if they have the same md5 sum as the previous event (either using a combination of scan and filter or a filter with a class). This makes the audio file look ok, but the overall pipeline becomes prohibitively slow and is essentially broken.
  3. Move the .publish() call to a later stage: The results are essentially the same as 1.

Thank you for your help.

@MainRo
Copy link
Collaborator

MainRo commented Dec 7, 2021

Hello @igordertigor,

How is done the branch/merge at p1 and p3? your issue may come from that part of the pipeline

@MainRo MainRo added the question label Dec 7, 2021
@igordertigor
Copy link
Author

Hi @MainRo,
thanks for your reply. Unfortunately it got lost in my github notifications. Regarding your question: I'm calling .publish() on the audio node. The I store the output up to p1 in a variable, that I use in both, p2 and p5, i.e. conceptually like this:

inp = audio.publish()
out_p1 = inp.pipe(map(rec), map(p1))
out_p2 = out_p1.pipe(map(p2))
out_p5 = out_p1.pipe(map(p5))
out_p3 = out_p2.pipe(merge(out_p5), ...)

Once the whole pipeline is set up, I call audio.connect(). Thanks for your help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants