Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQS Transport R-DUPE Warning Causes Messages To Never Be Made Visible #5159

Open
VoX opened this issue May 6, 2024 · 5 comments
Open

SQS Transport R-DUPE Warning Causes Messages To Never Be Made Visible #5159

VoX opened this issue May 6, 2024 · 5 comments

Comments

@VoX
Copy link

VoX commented May 6, 2024

Contact Details

No response

Version

8.x

On which operating system(s) are you experiencing the issue?

Windows

Using which broker(s) did you encounter the issue?

Amazon SQS

What are the steps required to reproduce the issue?

  1. Using the latest version of MassTransit.AmazonSQS (8.2.2) create a consumer that always throws (emulates a consumer fault).
public class TestConsumer : IConsumer<TestMessage>
{
    private readonly ILogger<TestConsumer> logger;

    public TestConsumer(ILogger<TestConsumer> logger)
    {
        this.logger = logger;
    }
    public async Task Consume(ConsumeContext<TestMessage> context)
    {
        await Task.Delay(TimeSpan.FromSeconds(1));

        var redelivered = context.ReceiveContext.Redelivered ? "redelivered" : "";
        logger.LogInformation($"Got Message {context.Message.Id} {redelivered}");

        throw new Exception();
    }
}
  1. Register a masstransit sqs bus and receive endpoint that uses the consumer. Set RethrowFaultedMessages so that the messages get redelivered by the queue when consuming faults rather than retried by masstransit's retry policy.
WebApplicationBuilder builder = WebApplication.CreateBuilder();

var services = builder.Services;
services.AddMassTransit(x =>
    {
        x.AddConsumer<TestConsumer>();
        x.UsingAmazonSqs((context, cfg) =>
        {
            cfg.Host("us-east-1", h =>
            {                        });
            cfg.ReceiveEndpoint("masstransit-test-q", endpointConfigurator =>
            {
                endpointConfigurator.RethrowFaultedMessages();
                endpointConfigurator.ThrowOnSkippedMessages();
                endpointConfigurator.ConfigureConsumer<TestConsumer>(context);
            });

        });
    });
  1. Start the bus and publish a message that is handled by a consumer.
var host = builder.Build();

await host.StartAsync();

var provider = host.Services;
var bus = provider.GetService<IBus>() as IBusControl;

await bus.StartAsync();
await bus.Publish(new TestMessage { Id = "1" });
  1. Run the program after configuring appropriate AWS credentials.

What is the expected behavior?

We should see the message get redelivered by the queue and handled by the consumer multiple times because it always faults.

What actually happened?

An unexpected R-DUPE warning is emitted in the logs and the message only gets handled once. More importantly the message never becomes visible in the queue again and we can see the "Number Of Messages Not Visible" SQS queue metric stays at 1 and the "Approximate Age Of Oldest Message" SQS queue metric continuously grows. This persists until the program is terminated at which point the message becomes visible.
sqs-metrics

Masstransit 8.0.13 and below have the expected behavior. I believe when this R-DUPE issue occurs the task which updates message visibility is never stopped. Seems likely related to this set of changes a3d66e5

Currently we are working around this issue by adding endpointConfigurator.RedeliverVisibilityTimeout = 5; which seems to avoid the R-DUPE issue at least most of the time.

Related log output, including any exceptions

dbug: Microsoft.Extensions.Hosting.Internal.Host[1]
      Hosting starting
info: MassTransit[0]
      Configured endpoint masstransit-test-q777, Consumer: AmazonSqsReceiveEndpointExample.TestConsumer
dbug: MassTransit.Transports.BusDepot[0]
      Starting bus instances: IBus
dbug: MassTransit[0]
      Starting bus: amazonsqs://us-east-1/
dbug: MassTransit[0]
      Connect: https://us-east-1/
info: Microsoft.Hosting.Lifetime[14]
      Now listening on: https://localhost:59189
info: Microsoft.Hosting.Lifetime[14]
      Now listening on: http://localhost:59190
dbug: Microsoft.AspNetCore.Hosting.Diagnostics[13]
      Loaded hosting startup assembly masstest
dbug: Microsoft.AspNetCore.Hosting.Diagnostics[13]
      Loaded hosting startup assembly Microsoft.AspNetCore.Watch.BrowserRefresh
dbug: MassTransit[0]
      Endpoint Ready: amazonsqs://us-east-1/<redacted>?durable=false&autodelete=true
info: Microsoft.Hosting.Lifetime[0]
      Application started. Press Ctrl+C to shut down.
info: Microsoft.Hosting.Lifetime[0]
      Hosting environment: Development
info: Microsoft.Hosting.Lifetime[0]
      Content root path: <redacted>
dbug: Microsoft.Extensions.Hosting.Internal.Host[2]
      Hosting started
warn: MassTransit[0]
      Start called, but the host was already started: amazonsqs://us-east-1/ (Already Started)
dbug: MassTransit[0]
      Created topic AmazonSqsReceiveEndpointExample-TestMessage arn:aws:sns:us-east-1:<redacted>:AmazonSqsReceiveEndpointExample-TestMessage
dbug: MassTransit[0]
      Created queue masstransit-test-q777 arn:aws:sqs:us-east-1:<redacted>:masstransit-test-q777 https://sqs.us-east-1.amazonaws.com/<redacted>/masstransit-test-q777
dbug: MassTransit[0]
      Binding topic name: AmazonSqsReceiveEndpointExample-TestMessage, durable, subscription-attributes: RawMessageDelivery=true to name: masstransit-test-q777, durable
dbug: MassTransit[0]
      Endpoint Ready: amazonsqs://us-east-1/masstransit-test-q777
info: MassTransit[0]
      Bus started: amazonsqs://us-east-1/
info: MassTransit[0]
      Bus started: amazonsqs://us-east-1/
dbug: MassTransit[0]
      Created topic AmazonSqsReceiveEndpointExample-TestMessage arn:aws:sns:us-east-1:<redacted>:AmazonSqsReceiveEndpointExample-TestMessage
dbug: MassTransit.Messages[0]
      SEND amazonsqs://us-east-1/AmazonSqsReceiveEndpointExample-TestMessage?type=topic b0150000-168a-902e-41e6-08dc6df60b5a AmazonSqsReceiveEndpointExample.TestMessage
info: AmazonSqsReceiveEndpointExample.TestConsumer[0]
      Got Message 1
dbug: MassTransit[0]
      Created topic MassTransit-Fault--AmazonSqsReceiveEndpointExample-TestMessage-- arn:aws:sns:us-east-1:<redacted>:MassTransit-Fault--AmazonSqsReceiveEndpointExample-TestMessage--
dbug: MassTransit.Messages[0]
      SEND amazonsqs://us-east-1/MassTransit-Fault--AmazonSqsReceiveEndpointExample-TestMessage--?type=topic b0150000-168a-902e-ba82-08dc6df60c29 MassTransit.Fault<AmazonSqsReceiveEndpointExample.TestMessage>
fail: MassTransit[0]
      R-FAULT amazonsqs://us-east-1/masstransit-test-q777 b0150000-168a-902e-41e6-08dc6df60b5a AmazonSqsReceiveEndpointExample.TestMessage AmazonSqsReceiveEndpointExample.TestConsumer(00:00:01.0671553)
      System.Exception: Exception of type 'System.Exception' was thrown.
         at AmazonSqsReceiveEndpointExample.TestConsumer.Consume(ConsumeContext`1 context) in C:\Users\<redacted>\Downloads\masstest\Program.cs:line 62
         at MassTransit.DependencyInjection.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next) in /_/src/MassTransit/DependencyInjection/DependencyInjection/ScopeConsumerFactory.cs:line 22
         at MassTransit.DependencyInjection.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next) in /_/src/MassTransit/DependencyInjection/DependencyInjection/ScopeConsumerFactory.cs:line 22
         at MassTransit.Middleware.ConsumerMessageFilter`2.MassTransit.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next) in /_/src/MassTransit/Middleware/ConsumerMessageFilter.cs:line 48
fail: MassTransit[0]
      R-FAULT amazonsqs://us-east-1/masstransit-test-q777 98ab590b-cee2-42f2-a2e2-f203fe149c99 00:00:01.4274055
      System.Exception: Exception of type 'System.Exception' was thrown.
         at AmazonSqsReceiveEndpointExample.TestConsumer.Consume(ConsumeContext`1 context) in C:\Users\<redacted>\Downloads\masstest\Program.cs:line 62
         at MassTransit.DependencyInjection.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next) in /_/src/MassTransit/DependencyInjection/DependencyInjection/ScopeConsumerFactory.cs:line 22
         at MassTransit.DependencyInjection.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next) in /_/src/MassTransit/DependencyInjection/DependencyInjection/ScopeConsumerFactory.cs:line 22
         at MassTransit.Middleware.ConsumerMessageFilter`2.MassTransit.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next) in /_/src/MassTransit/Middleware/ConsumerMessageFilter.cs:line 48
         at MassTransit.Middleware.ConsumerMessageFilter`2.MassTransit.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next) in /_/src/MassTransit/Middleware/ConsumerMessageFilter.cs:line 73
         at MassTransit.Middleware.TeeFilter`1.<>c__DisplayClass5_0.<<Send>g__SendAsync|1>d.MoveNext() in /_/src/MassTransit/Middleware/TeeFilter.cs:line 40
      --- End of stack trace from previous location ---
         at MassTransit.Middleware.ConsumeContextOutputMessageTypeFilter`1.SendToOutput(IPipe`1 next, ConsumeContext`1 pipeContext) in /_/src/MassTransit/Middleware/ConsumeContextOutputMessageTypeFilter.cs:line 76
         at MassTransit.Middleware.ConsumeContextOutputMessageTypeFilter`1.SendToOutput(IPipe`1 next, ConsumeContext`1 pipeContext) in /_/src/MassTransit/Middleware/ConsumeContextOutputMessageTypeFilter.cs:line 108
         at MassTransit.Middleware.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next) in /_/src/MassTransit/Middleware/DeserializeFilter.cs:line 40
         at MassTransit.Middleware.RescueFilter`2.MassTransit.IFilter<TContext>.Send(TContext context, IPipe`1 next) in /_/src/MassTransit/Middleware/RescueFilter.cs:line 43
warn: MassTransit[0]
      R-DUPE amazonsqs://us-east-1/masstransit-test-q777 98ab590b-cee2-42f2-a2e2-f203fe149c99 98ab590b-cee2-42f2-a2e2-f203fe149c99
fail: MassTransit[0]
      T-FAULT amazonsqs://us-east-1/masstransit-test-q777 98ab590b-cee2-42f2-a2e2-f203fe149c99
      System.Exception: Exception of type 'System.Exception' was thrown.
         at AmazonSqsReceiveEndpointExample.TestConsumer.Consume(ConsumeContext`1 context) in C:\Users\<redacted>\Downloads\masstest\Program.cs:line 62
         at MassTransit.DependencyInjection.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next) in /_/src/MassTransit/DependencyInjection/DependencyInjection/ScopeConsumerFactory.cs:line 22
         at MassTransit.DependencyInjection.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next) in /_/src/MassTransit/DependencyInjection/DependencyInjection/ScopeConsumerFactory.cs:line 22
         at MassTransit.Middleware.ConsumerMessageFilter`2.MassTransit.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next) in /_/src/MassTransit/Middleware/ConsumerMessageFilter.cs:line 48
         at MassTransit.Middleware.ConsumerMessageFilter`2.MassTransit.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next) in /_/src/MassTransit/Middleware/ConsumerMessageFilter.cs:line 73
         at MassTransit.Middleware.TeeFilter`1.<>c__DisplayClass5_0.<<Send>g__SendAsync|1>d.MoveNext() in /_/src/MassTransit/Middleware/TeeFilter.cs:line 40
      --- End of stack trace from previous location ---
         at MassTransit.Middleware.ConsumeContextOutputMessageTypeFilter`1.SendToOutput(IPipe`1 next, ConsumeContext`1 pipeContext) in /_/src/MassTransit/Middleware/ConsumeContextOutputMessageTypeFilter.cs:line 76
         at MassTransit.Middleware.ConsumeContextOutputMessageTypeFilter`1.SendToOutput(IPipe`1 next, ConsumeContext`1 pipeContext) in /_/src/MassTransit/Middleware/ConsumeContextOutputMessageTypeFilter.cs:line 108
         at MassTransit.Middleware.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next) in /_/src/MassTransit/Middleware/DeserializeFilter.cs:line 40
         at MassTransit.Middleware.RescueFilter`2.MassTransit.IFilter<TContext>.Send(TContext context, IPipe`1 next) in /_/src/MassTransit/Middleware/RescueFilter.cs:line 43
         at MassTransit.Internals.ExceptionExtensions.Rethrow(Exception exception) in /_/src/MassTransit.Abstractions/Internals/Extensions/ExceptionExtensions.cs:line 15
         at MassTransit.Middleware.RethrowErrorTransportFilter.Send(ExceptionReceiveContext context, IPipe`1 next) in /_/src/MassTransit/Middleware/RethrowErrorTransportFilter.cs:line 14
         at MassTransit.Middleware.RescueFilter`2.MassTransit.IFilter<TContext>.Send(TContext context, IPipe`1 next) in /_/src/MassTransit/Middleware/RescueFilter.cs:line 61
         at MassTransit.Middleware.DeadLetterFilter.MassTransit.IFilter<MassTransit.ReceiveContext>.Send(ReceiveContext context, IPipe`1 next) in /_/src/MassTransit/Middleware/DeadLetterFilter.cs:line 32
         at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 65
         at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 108
         at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock) in /_/src/MassTransit/Transports/ReceivePipeDispatcher.cs:line 115
         at MassTransit.AmazonSqsTransport.Middleware.AmazonSqsMessageReceiver.HandleMessage(Message message, ReceiveLockContext lockContext) in /_/src/Transports/MassTransit.AmazonSqsTransport/AmazonSqsTransport/Middleware/AmazonSqsMessageReceiver.cs:line 114

Link to repository that demonstrates/reproduces the issue

https://github.com/VoX/masstransit-r-dupe-issue

@VoX VoX changed the title SQS Transport R-DUPE Error Causes Messages To Never Be Made Visibile SQS Transport R-DUPE Error Causes Messages To Never Be Made Visible May 6, 2024
@VoX VoX changed the title SQS Transport R-DUPE Error Causes Messages To Never Be Made Visible SQS Transport R-DUPE Warning Causes Messages To Never Be Made Visible May 6, 2024
@phatboyg
Copy link
Member

phatboyg commented May 6, 2024

Thanks for pointing this out, when using this type of fault handling, it's recommended to set a delay before redelivery.

sqs.RethrowFaultedMessages();
sqs.ThrowOnSkippedMessages();
sqs.RedeliverVisibilityTimeout = 5;

As you've done. The value can actually be lower, it works with 1 second, but nonetheless some sort of delay is recommended.

That being said, it's worth investigating to ensure the lock context isn't being improperly renewed.

@phatboyg
Copy link
Member

phatboyg commented May 6, 2024

namespace MassTransit.AmazonSqsTransport.Tests;

using System;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using NUnit.Framework;
using Testing;


[TestFixture]
public class ReleaseLockContext_Specs
{
    [Test]
    public async Task Should_release_subsequent_lock_contexts()
    {
        var services = new ServiceCollection();

        await using var provider = services
            .AddMassTransitTestHarness(x =>
            {
                x.AddConsumer<TestLockContextConsumer>();

                x.AddConfigureEndpointsCallback((context,name,cfg)=>
                {
                    if(cfg is IAmazonSqsReceiveEndpointConfigurator sqs)
                    {
                        sqs.RethrowFaultedMessages();
                        sqs.ThrowOnSkippedMessages();
                        // sqs.RedeliverVisibilityTimeout = 1;
                    }
                });
                x.UsingAmazonSqs((context, cfg) =>
                {
                    cfg.LocalstackHost();

                    cfg.ConfigureEndpoints(context);
                });
            })
            .BuildServiceProvider(true);

        var harness = provider.GetTestHarness();

        await harness.Start();
        try
        {
            await harness.Bus.Publish(new TestLockContextMessage() { Id = "567" });

            Assert.That(await harness.Published.Any<TestLockContextRedeliveredMessage>());
        }
        finally
        {
            await harness.Stop();
        }
    }
}


public class TestLockContextConsumer :
    IConsumer<TestLockContextMessage>
{
    readonly ILogger<TestLockContextConsumer> _logger;

    public TestLockContextConsumer(ILogger<TestLockContextConsumer> logger)
    {
        _logger = logger;
    }

    public async Task Consume(ConsumeContext<TestLockContextMessage> context)
    {
        await Task.Delay(TimeSpan.FromSeconds(1));

        var redelivered = context.ReceiveContext.Redelivered ? "redelivered" : "";
        _logger.LogInformation($"Got Message {context.Message.Id} {redelivered}");

        if (context.ReceiveContext.Redelivered)
        {
            await context.Publish(new TestLockContextRedeliveredMessage() { Id = context.Message.Id });
            return;
        }

        throw new Exception("This is intentional");
    }
}


public record TestLockContextMessage
{
    public string Id { get; init; }
}


public record TestLockContextRedeliveredMessage
{
    public string Id { get; init; }
}

@phatboyg
Copy link
Member

phatboyg commented May 6, 2024

This unit test will reproduce it, may have to run it a few times until the message is redelivered and marked as a duplicate.

@VoX
Copy link
Author

VoX commented May 7, 2024

Thanks for pointing this out, when using this type of fault handling, it's recommended to set a delay before redelivery.

sqs.RethrowFaultedMessages();
sqs.ThrowOnSkippedMessages();
sqs.RedeliverVisibilityTimeout = 5;

As you've done. The value can actually be lower, it works with 1 second, but nonetheless some sort of delay is recommended.

That being said, it's worth investigating to ensure the lock context isn't being improperly renewed.

We originally tried RedeliverVisibilityTimeout = 1; but were still very occasionally seeing this issue in production workloads. Unfortunately when it happens to even a single message it causes the "Approximate Age Of Oldest Message" metric to continuously rise which eventually requires someone to look into the root cause.

@phatboyg
Copy link
Member

phatboyg commented May 7, 2024

Yeah, the task is never canceled, so it will renew up to the 12 hour limit in theory. The trick is how to fix it without other breaking side effects.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants