Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using task_done() in multiple threads #89

Open
kodonnell opened this issue Jan 28, 2019 · 5 comments
Open

Using task_done() in multiple threads #89

kodonnell opened this issue Jan 28, 2019 · 5 comments
Labels

Comments

@kodonnell
Copy link

I'd like to use Queue to store items to be processed by threads. However, if one of the items fails to get processed (and task_done is hence not called) it's still possible that the item is removed from the queue persistently (whereas one would expect it not to be, as is usual behaviour).

Example:

import threading
import time

from persistqueue import Queue

q = Queue("testq")


def worker1():
    print("getting from worker1")
    x = q.get()
    print("got", x, "from worker1")
    # processing goes here ... takes some time
    time.sleep(2)
    try:
        assert False, "something went wrong"
        q.task_done()
    except:
        print("something went wrong with worker1 in processing", x, "so not calling task_done")


def worker2():
    time.sleep(1)
    print("getting from worker2")
    x = q.get()
    print("got", x, "from worker2")
    # processing would happen here - but happens quicker than task1
    print("finished processing", x, "from worker2 so calling task_done")
    q.task_done()
    print("called task_done from worker2")


if __name__ == "__main__":

    q.put("a")
    q.put("b")

    t1 = threading.Thread(target=worker1)
    t1.start()
    t2 = threading.Thread(target=worker2)
    t2.start()
    t1.join()
    t2.join()
    print("reloading q")
    del q
    q = Queue("testq")
    print("qsize", q.qsize())

Output:

getting from worker1
got a from worker1
getting from worker2
got b from worker2
finished processing b from worker2 so calling task_done
called task_done from worker2
something went wrong with worker1 in processing a so not calling task_done
reloading q
qsize 0

As you can see, 'a' was permanently removed, even though task_done "wasn't" called. In other words, I'd expect to see qsize 1 as the output. Is there a way to achieve this, i.e. task_done only completes a specific task, not all tasks in all threads?

Bonus question: how do I also add 'a' back onto the in-memory queue (ignoring persistence)? I.e. the equivalent of SQLiteAckQueue.nack? The only way I see how would be reloading the queue from disk (in which case the get wouldn't have persisted) but this seems messy.

(Also, yes, I know of the SQLiteAckQueue which seems well-suited, but I'd prefer to use plain files if possible.)

@kodonnell
Copy link
Author

Related: #34 #55

@peter-wangxu
Copy link
Owner

@kodonnell this is known limitation for file queue. In my opinion,you should reenque the failed items so that it can be processed later, can this fit your case

@kodonnell
Copy link
Author

this is known limitation for file queue

Sorry, I wasn't aware. Can this be documented? It's described as "thread-safe" and this doesn't really fit that bill. Also - doesn't the same apply to the sqlite queue? (I assume that's what the SQLiteAckQueue is for.)

you should reenque the failed items so that it can be processed later, can this fit your case

Ah - so you mean every time I .get() I follow it with .task_done() - and then if a fail happens, I requeue it? This should work, though the FIFO order wouldn't be preserved - which isn't too much of a drama for us, actually.

@peter-wangxu
Copy link
Owner

@kodonnell sqlite ack queue should fit well in your case since you have strict FIFO requirement, I strongly suggest you trying it.

the file queue data is written sequentially, and it's hard to implement ACK for part of its content. If you have any idea, just pop it up here.

Thanks
Peter

@peter-wangxu peter-wangxu added bug file file only bug and removed file file only bug labels Feb 11, 2019
@avineshwar
Copy link

avineshwar commented Sep 6, 2022

this is known limitation for file queue

Sorry, I wasn't aware. Can this be documented? It's described as "thread-safe" and this doesn't really fit that bill. Also - doesn't the same apply to the sqlite queue? (I assume that's what the SQLiteAckQueue is for.)

you should reenque the failed items so that it can be processed later, can this fit your case

Ah - so you mean every time I .get() I follow it with .task_done() - and then if a fail happens, I requeue it? This should work, though the FIFO order wouldn't be preserved - which isn't too much of a drama for us, actually.

Late here but use multiple similar queue having dedicated roles (i.e. not just 1 but 2 additional compensating queues, so 1 queue for success and 2 queues for handling failure scenarios); accordingly put items so that you can jump / swap between them. That should ensure FIFO with some additional load.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants