Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move messages from one queue to another in a safe transact manner #725

Open
AndersMalmgren opened this issue Jun 19, 2023 · 3 comments
Open
Labels

Comments

@AndersMalmgren
Copy link

Hi.
We have made an application using Masstransit and azure service bus. If something goes wrong and after a few retries masstransit will move the message to a error queue. We need a transact way of moving the message from the error queue back to the original queue. And it needs to be transact so we are sure the message will not be removed from the error queue until it have successfully been publish on the destination queue.

@SeanFeldman
Copy link
Collaborator

@AndersMalmgren is your error queue a centralized error queue? I.e. when moving messages back those would be more than one destination?

@AndersMalmgren
Copy link
Author

Mass transit adds a error queue with the postfix _error
All errors for the queue goes here. I created a little tool for now that does the job in a transact way


    public async Task<IEnumerable<Message>> PeekMessages()
    {
        if (_source != null) await _source.DisposeAsync();

        _source = _client.CreateReceiver($"{Path}_error");
        return (await _source.PeekMessagesAsync(100))
            .Select(m => new Message(m));
    }

    public async Task ResendMessages(IEnumerable<ResendableMessage> messages)
    {
        if (_source == null) throw new ArgumentNullException();

        using var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);
        await using var destination = _client.CreateSender(Path);

        var lookup = messages.ToDictionary(m => m.Seq);

        var lockedMessages = await _source.ReceiveMessagesAsync(1000);

        foreach (var lockedMessage in lockedMessages)
        {
            if (lookup.TryGetValue(lockedMessage.SequenceNumber, out var orginalMessage))
            {
                if(orginalMessage.Message.MessageId != lockedMessage.MessageId) throw new ApplicationException("MessageId desync");
                var send = new ServiceBusMessage(orginalMessage.Message);
                if (orginalMessage.Content != null)
                    send.Body = new BinaryData(orginalMessage.Content);

                await _source.CompleteMessageAsync(lockedMessage);
                await destination.SendMessageAsync(send);
            }
            else
                await _source.AbandonMessageAsync(lockedMessage);
        }

        ts.Complete();
    }

@flower7434
Copy link

I believe I solved this by temporary enable auto forwarding to a temp queue and then back to the original queue.

@ErikMogensen ErikMogensen changed the title Move messags from one queue to another in a safe transact manner Move messages from one queue to another in a safe transact manner Apr 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants