Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using partition "breaks" program logic #427

Open
scherand opened this issue Jul 14, 2021 · 2 comments
Open

Using partition "breaks" program logic #427

scherand opened this issue Jul 14, 2021 · 2 comments

Comments

@scherand
Copy link

I am struggling to use partition in a pipeline because it "breaks" the logic of my program; presumably because it introduces asynchronous processing.

As a simplified example, I have something that works along the lines of this:

import streamz


def main():
    state = {
        "cnt": 0,
    }
    stream = streamz.Stream()
    cntd = stream.accumulate(cnt,
                             returns_state=True,
                             start=state)
    cntd.sink(print)

    with open("many_lines.txt", "r") as fh:
        for line in fh:
            stream.emit(line)
    print(f"found {state.get('cnt')} lines")


def cnt(state, itm):
    state["cnt"] += 1
    return state, itm


if __name__ == "__main__":
    main()

This basically runs through all the lines in the file many_lines.txt, counts and prints them and then reports

found 10000 lines

So far so good.

When I introduce partition now, like this:

import streamz


def main():
    state = {
        "cnt": 0,
    }
    stream = streamz.Stream()
    parted = stream.partition(10001, timeout=2)  # <= PARTITION HERE
    cntd = parted.accumulate(cnt,
                             returns_state=True,
                             start=state)
    cntd.sink(print)

    with open("many_lines.txt", "r") as fh:
        for line in fh:
            stream.emit(line)
    print(f"found {state.get('cnt')} lines")


def cnt(state, itm):
    state["cnt"] += 1
    return state, itm


if __name__ == "__main__":
    main()

I would want to see basically the same result. But I see nothing for some time and then

found 0 lines

I know, there are only 10'000 lines in many_lines.txt so the partition will never fill up, but it should hit the timeout at some point and "release" the data, no?

I suspect that the program terminates before the partition hits the timeout, so I tried (many variations of) awaiting stream.emit(line). That was inspired by the async def process_file(fn): function in Processing Time and Back Pressure.

For example like this:

import streamz


def main():
    state = {
        "cnt": 0,
    }
    stream = streamz.Stream()
    parted = stream.partition(10001, timeout=2)
    cntd = parted.accumulate(cnt,
                             returns_state=True,
                             start=state)
    cntd.sink(print)

    with open("many_lines.txt", "r") as fh:
        for line in fh:
            await stream.emit(line)  # <= USE AWAIT HERE
    print(f"found {state.get('cnt')} lines")


def cnt(state, itm):
    state["cnt"] += 1
    return state, itm


if __name__ == "__main__":
    main()

But this (obviously) does not work (SyntaxError: 'await' outside async function). And I also did not find a way to make it work.

(How) Can I make sure the for loop terminates before the print statement (or any remaining code, for that matter) is executed? Or am I getting this completely wrong?

My use case is to read (all) lines in pretty big files (I cannot load into memory at once), send them through a streamz pipeline and then continue with my program. "Then" meaning, after all lines are processed (also those that might be "stuck" in a partition when no more lines are emitted because we reached EOF; this is why I need the timeout, I believe).

@scherand
Copy link
Author

scherand commented Sep 29, 2021

Found a closely related SO post, unfortunately not providing a clear answer:

"Is there a nice way to otherwise check if a Stream still contains elements being processed": not that I am aware of.

Is there none or is the person just "not aware of it"?

@scherand
Copy link
Author

scherand commented Sep 29, 2021

I made some progress now. The following seems to achieve what I was hoping to achieve. I am however not 100% sure if this only works in this pathological toy-example. I am mainly unsure if the call to .wait() blocks the current (and only?) thread so no work would be done by streamz while wait()ing in a real application?

import streamz
import threading

from functools import partial


def main():
    # Event used to signal when processing is done
    signal_done = threading.Event()

    state = {
        "cnt": 0,
    }

    done_cb = partial(_singal_done_cb, evnt=signal_done)
    # crate a RefCounter keeping track of number of items in stream
    ref_c = streamz.RefCounter(cb=done_cb)

    source = streamz.Stream()
    parted = source.partition(10001, timeout=2)  # <= PARTITION HERE
    cntd = parted.accumulate(cnt,
                             returns_state=True,
                             start=state)
    cntd.sink(dev_null)

    with open("many_lines.txt", "r") as fh:
        signal_done.clear()
        for line in fh:
            source.emit(line, metadata=[{"ref": ref_c}])

    signal_done.wait(timeout=5)
    print(f"found {state.get('cnt')} lines")


def cnt(state, itm):
    state["cnt"] += 1
    return state, itm


def dev_null(itm):
    return None


def _singal_done_cb(evnt=None):
    evnt.set()


if __name__ == "__main__":
    main()

This now returns

found 1 lines

after about 2 seconds, which is expected since partition only emits one partition.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant