Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQS - Failure to restore unacked messages #1835

Open
YahelOpp opened this issue Dec 19, 2023 · 1 comment
Open

SQS - Failure to restore unacked messages #1835

YahelOpp opened this issue Dec 19, 2023 · 1 comment

Comments

@YahelOpp
Copy link

Versions:

  • Python 3.10
  • Kombu 5.3.4
  • Celery 5.3.6
  • Boto3 1.33.7

After seeing messages stuck in NotVisible state at our production queues, I've set out to verify the warm shutdown mechanism works properly. My test setup:

  • Simple task: sleeps for a desired amount of seconds, then retries with a countdown of 10.
  • Max retries: 3
  • Single worker, threads pool, concurrency 5

What I see happens:

  • A task is run
  • It raises the retry -> putting a new message in the queue
  • The message is consumed by the same worker directly (message received)
  • During the countdown period, the message is in NotVisible state
  • During the countdown period, I shutdown the worker with SIGTERM
  • The worker successfully starts a Warm shutdown
  • The worker notes that it is restoring 1 unacked message
  • The message stays in NotVisible state

Expected behavior: the message should move to Visible state.

A possible explanation I've found:
By manually changing the code in kombu/transport/SQS.py at the _put function in the case message.get('redelivered'), adding a duplicate call to change_message_visibility - the problem seemed to go away. I've tested this against real live SQS and against Localstack. In both the problem was consistent and solved once I added the duplicate call.

To sum up, the problem might be on boto3's side and not in kombu, but I'm unsure about this.
I will be happy to assist and make a PR / provide a simple testing scenario.

Thanks

@YahelOpp
Copy link
Author

The patch I've done that makes it work as expected is:


# Patch a fix for SQS channel
original_put = Transport.Channel._put

def patched_put(self, queue, message, **kwargs):
    original_put(self, queue, message, **kwargs)
    if message.get("redelivered"):
        q_url = self._new_queue(queue)
        c = self.sqs(queue=self.canonical_queue_name(queue))
        c.change_message_visibility(
            QueueUrl=q_url,
            ReceiptHandle=message['properties']['delivery_tag'],
            VisibilityTimeout=0
        )


Transport.Channel._put = patched_put

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant