Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handler messaging processing error #273

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
################################################################################
# This .gitignore file was automatically created by Microsoft(R) Visual Studio.
################################################################################

/.vs
/.idea
/artifacts
*/**/bin
*/**/obj
**/packages
*/**/*.csproj.user
################################################################################
# This .gitignore file was automatically created by Microsoft(R) Visual Studio.
################################################################################
/.vs
/.idea
/artifacts
*/**/bin
*/**/obj
**/packages
*/**/*.csproj.user
TestResults
*/**/BenchmarkDotNet.Artifacts
**/*.DotSettings.user
*/**/appsettings.Development.json
*.user
*.suo
.fake
.ionide
*.suo
.fake
.ionide
1 change: 1 addition & 0 deletions NBB.Messaging.slnf
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"src\\Messaging\\NBB.Messaging.Effects\\NBB.Messaging.Effects.csproj",
"src\\Messaging\\NBB.Messaging.Host\\NBB.Messaging.Host.csproj",
"src\\Messaging\\NBB.Messaging.InProcessMessaging\\NBB.Messaging.InProcessMessaging.csproj",
"src\\Messaging\\NBB.Messaging.JetStream\\NBB.Messaging.JetStream.csproj",
"src\\Messaging\\NBB.Messaging.MultiTenancy\\NBB.Messaging.MultiTenancy.csproj",
"src\\Messaging\\NBB.Messaging.Nats\\NBB.Messaging.Nats.csproj",
"src\\Messaging\\NBB.Messaging.Noop\\NBB.Messaging.Noop.csproj",
Expand Down
34 changes: 17 additions & 17 deletions src/Messaging/NBB.Messaging.Abstractions/IMessageBusSubscriber.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
// Copyright (c) TotalSoft.
// This source code is licensed under the MIT license.

using System;
using System.Threading;
using System.Threading.Tasks;

namespace NBB.Messaging.Abstractions
{
public interface IMessageBusSubscriber
{
Task<IDisposable> SubscribeAsync<TMessage>(Func<MessagingEnvelope<TMessage>, Task> handler,
MessagingSubscriberOptions options = null, CancellationToken cancellationToken = default);


Task<IDisposable> SubscribeAsync(Func<MessagingEnvelope, Task> handler,
MessagingSubscriberOptions options = null, CancellationToken cancellationToken = default)
=> SubscribeAsync<object>(handler, options, cancellationToken);
}
}
using System;
using System.Threading;
using System.Threading.Tasks;
namespace NBB.Messaging.Abstractions
{
public interface IMessageBusSubscriber
{
Task<IDisposable> SubscribeAsync<TMessage>(Func<MessagingEnvelope<TMessage>, Task> handler,
MessagingSubscriberOptions options = null, CancellationToken cancellationToken = default);
Task<IDisposable> SubscribeAsync(Func<MessagingEnvelope, Task> handler,
MessagingSubscriberOptions options = null, CancellationToken cancellationToken = default)
=> SubscribeAsync<object>(handler, options, cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public interface IMessagingTransport
/// <param name="options">Subscription options</param>
/// <param name="cancellationToken"></param>
/// <returns>An object that when disposed unsubscribes the handler from the topic</returns>
Task<IDisposable> SubscribeAsync(string topic, Func<TransportReceiveContext, Task> handler,
Task<IDisposable> SubscribeAsync(string topic, Func<TransportReceiveContext, Task<PipelineResult>> handler,
SubscriptionTransportOptions options = null, CancellationToken cancellationToken = default);

/// <summary>
Expand All @@ -45,6 +45,11 @@ public record TransportSendContext(

public record TransportReceiveContext(TransportReceivedData ReceivedData);

public record PipelineResult(bool Success, string Error)
{
public static PipelineResult SuccessResult = new PipelineResult(true, null);
}


public abstract record TransportReceivedData
{
Expand Down
24 changes: 18 additions & 6 deletions src/Messaging/NBB.Messaging.Abstractions/MessageBusSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ public class MessageBusSubscriber : IMessageBusSubscriber

var topicName = _topicRegistry.GetTopicForName(topicNameWithoutPrefix);

async Task MsgHandler(TransportReceiveContext receiveContext)
async Task<PipelineResult> MsgHandler(TransportReceiveContext receiveContext)
{
_logger.LogDebug("Messaging subscriber received message from subject {Subject}", topicName);

MessagingEnvelope<TMessage> messageEnvelope = null;

try
{
messageEnvelope = receiveContext.ReceivedData switch
Expand All @@ -51,19 +52,30 @@ async Task MsgHandler(TransportReceiveContext receiveContext)
=> new MessagingEnvelope<TMessage>(headers, _messageSerDes.DeserializePayload<TMessage>(payloadBytes, headers, options?.SerDes)),
_ => throw new Exception("Invalid received message data")
};
}
catch (Exception ex)
{
_logger.LogError(ex, "Deserialization error on topic {Subject}.", topicName);

if (messageEnvelope != null)
_deadLetterQueue.Push(messageEnvelope, topicName, ex);
else
_deadLetterQueue.Push(receiveContext.ReceivedData, topicName, ex);

return new PipelineResult(false, ex.Message);
}

try
{
await handler(messageEnvelope);
return PipelineResult.SuccessResult;
}
catch (Exception ex)
{
_logger.LogError(ex, "Messaging subscriber encountered an error when handling a message from subject {Subject}.",
topicName);

if (messageEnvelope != null)
_deadLetterQueue.Push(messageEnvelope, topicNameWithoutPrefix, ex);
else
_deadLetterQueue.Push(receiveContext.ReceivedData, topicNameWithoutPrefix, ex);

return new PipelineResult(false, ex.Message);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ public async Task Invoke(MessagingContext context, CancellationToken cancellatio
"An unhandled exception has occurred while processing message of type {MessageType}.",
context.MessagingEnvelope.Payload.GetType().GetPrettyName());

Activity.Current?.SetException(ex);
Activity.Current?.SetStatus(Status.Error);
//Activity.Current?.SetException(ex);
//Activity.Current?.SetStatus(Status.Error);

_deadLetterQueue.Push(context.MessagingEnvelope, context.TopicName, ex);

throw;
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public InProcessMessagingTransport(IStorage storage, ILogger<InProcessMessagingT

public event TransportErrorHandler OnError;

public async Task<IDisposable> SubscribeAsync(string topic, Func<TransportReceiveContext, Task> handler,
public async Task<IDisposable> SubscribeAsync(string topic, Func<TransportReceiveContext, Task<PipelineResult>> handler,
SubscriptionTransportOptions options = null,
CancellationToken cancellationToken = default)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ public Task PublishAsync(string topic, TransportSendContext sendContext, Cancell
});
}

public Task<IDisposable> SubscribeAsync(string topic,
Func<TransportReceiveContext, Task> handler,
SubscriptionTransportOptions options = null,
CancellationToken cancellationToken = default)
public Task<IDisposable> SubscribeAsync(string topic, Func<TransportReceiveContext, Task<PipelineResult>> handler,
SubscriptionTransportOptions options = null, CancellationToken cancellationToken = default)
{

IDisposable consumer = null;
Expand Down
2 changes: 1 addition & 1 deletion src/Messaging/NBB.Messaging.Nats/StanMessagingTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public StanMessagingTransport(StanConnectionProvider stanConnectionManager, IOpt
_natsOptions = natsOptions;
}

public Task<IDisposable> SubscribeAsync(string topic, Func<TransportReceiveContext, Task> handler,
public Task<IDisposable> SubscribeAsync(string topic, Func<TransportReceiveContext, Task<PipelineResult>> handler,
SubscriptionTransportOptions options = null,
CancellationToken cancellationToken = default)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Messaging/NBB.Messaging.Noop/NoopMessagingTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace NBB.Messaging.Noop
{
public class NoopMessagingTransport : IMessagingTransport
{
public Task<IDisposable> SubscribeAsync(string topic, Func<TransportReceiveContext, Task> handler,
public Task<IDisposable> SubscribeAsync(string topic, Func<TransportReceiveContext, Task<PipelineResult>> handler,
SubscriptionTransportOptions options = null,
CancellationToken cancellationToken = default)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ void NewCustomizer(MessagingEnvelope outgoingEnvelope)

var formattedTopicName = _topicRegistry.GetTopicForName(options.TopicName) ??
_topicRegistry.GetTopicForMessageType(message.GetType());
var operationName = $"{message.GetType().GetPrettyName()} send";

var prettyName = message.GetType().GetPrettyName();
if (prettyName.Contains("AnonymousType"))
prettyName = formattedTopicName;

var operationName = $"{prettyName} send";

using var activity = activitySource.StartActivity(operationName, ActivityKind.Producer);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
// Copyright (c) TotalSoft.
// This source code is licensed under the MIT license.

using NBB.Core.Abstractions;
using NBB.Messaging.Abstractions;
using OpenTelemetry.Trace;
using OpenTelemetry;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Trace;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Context.Propagation;
using NBB.Core.Abstractions;
using NBB.Messaging.OpenTelemetry.Publisher;
using System.Reflection.PortableExecutable;

namespace NBB.Messaging.OpenTelemetry.Subscriber
{
Expand Down
6 changes: 3 additions & 3 deletions src/Messaging/NBB.Messaging.Rusi/RusiMessagingTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ internal class RusiMessagingTransport : IMessagingTransport, ITransportMonitor
await _client.PublishAsync(request);
}

public async Task<IDisposable> SubscribeAsync(string topic, Func<TransportReceiveContext, Task> handler,
public async Task<IDisposable> SubscribeAsync(string topic, Func<TransportReceiveContext, Task<PipelineResult>> handler,
SubscriptionTransportOptions options = null, CancellationToken cancellationToken = default)
{
var transport = options ?? SubscriptionTransportOptions.Default;
Expand Down Expand Up @@ -108,8 +108,8 @@ await foreach (var msg in subscription.ResponseStream.ReadAllAsync(cancellationT
{
try
{
await handler(receiveContext);
await ackChannel.Writer.WriteAsync(new() { MessageId = msg.Id });
var result = await handler(receiveContext);
await ackChannel.Writer.WriteAsync(new() { MessageId = msg.Id, Error = result.Error });
}
catch (Exception ex)
{
Expand Down
Loading
Loading