Skip to content

Commit

Permalink
Add safety net to prevent requesting over 100 events at once (#141)
Browse files Browse the repository at this point in the history
* Implement controlled parallelism when handling domain events

* Fix tests

* Update src/Workleap.DomainEventPropagation.Subscription.PullDelivery/EventPullerService.cs

Co-authored-by: Gérald Barré <[email protected]>

* PR Rename suggestion

* Rename derp

* Rework parallelization

* Use ValueTask since most of the time this will return immediately

* Try to fix CI

* Ensure we don't request more than 100 events at once

* Adjust documentation

---------

Co-authored-by: Gérald Barré <[email protected]>
  • Loading branch information
Le-Merle and meziantou committed Jun 17, 2024
1 parent 29b25c5 commit 2817a61
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 1 deletion.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ services.AddPullDeliverySubscription()
"TopicEndpoint": "<azure_topic_uri>",
"TopicName": "<namespace_topic_to_listen_to>"
"SubscriptionName": "<subscription_name_under_specified_topic>",
"MaxDegreeOfParallelism": 10,
"TopicAccessKey": "<secret_value>", // Can be omitted to use Azure Identity (RBAC)
}
}
Expand All @@ -217,12 +218,14 @@ services.AddPullDeliverySubscription()
"TopicEndpoint": "<azure_topic_uri>",
"TopicName": "<namespace_topic_to_listen_to>"
"SubscriptionName": "<subscription_name_under_specified_topic>",
"MaxDegreeOfParallelism": 10,
"TopicAccessKey": "<secret_value>", // Can be omitted to use Azure Identity (RBAC)
},
"TopicSub2": {
"TopicEndpoint": "<azure_topic_uri>",
"TopicName": "<namespace_topic_to_listen_to>"
"SubscriptionName": "<subscription_name_under_specified_topic>",
"MaxDegreeOfParallelism": 10,
"TopicAccessKey": "<secret_value>", // Can be omitted to use Azure Identity (RBAC)
}
}
Expand All @@ -240,6 +243,9 @@ services.AddPullDeliverySubscription()
// Namespace topic subscription name
options.SubscriptionName = "<subscription_name>";

// Maximum degree of parallelism for processing events
options.MaxDegreeOfParallelism = 10;

// Using an access key
options.TopicAccessKey = "<secret_value>";

Expand Down Expand Up @@ -281,6 +287,7 @@ public class ExampleDomainEventHandler : IDomainEventHandler<ExampleDomainEvent>
* You may only define one domain event handler per domain event you wish to handle. If you would require more, use the single allowed domain event handler as a facade for multiple operations.
* Domain event handlers must have idempotent behavior (you could execute it multiple times for the same event and the result would always be the same).
* If your domain event types and handlers are in dedicated assemblies, you can reference the [Workleap.DomainEventPropagation.Abstractions](https://www.nuget.org/packages/Workleap.DomainEventPropagation.Abstractions) packages in order to avoid a dependency on third-parties like Azure and Microsoft extensions.
* For improved performance, it is possible to run multiple event handlers in parallel by adjusting the `MaxDegreeOfParallelism` option in the subscription configuration. The default value is 1.

## Building, releasing and versioning

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ protected override Task ExecuteAsync(CancellationToken stoppingToken)
private class EventGridSubscriptionEventPuller
{
private const int OutputChannelSize = 5000;
private const int MaxEventRequestSize = 100;

private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly ILogger<EventPullerService> _logger;
Expand Down Expand Up @@ -108,7 +109,7 @@ private async IAsyncEnumerable<EventGridClientAdapter.EventGridClientAdapter.Eve
continue;
}

var bundles = await this._eventGridTopicSubscription.Client.ReceiveCloudEventsAsync(this._eventGridTopicSubscription.TopicName, this._eventGridTopicSubscription.SubscriptionName, availableHandlers, cancellationToken).ConfigureAwait(false);
var bundles = await this._eventGridTopicSubscription.Client.ReceiveCloudEventsAsync(this._eventGridTopicSubscription.TopicName, this._eventGridTopicSubscription.SubscriptionName, Math.Min(availableHandlers, MaxEventRequestSize), cancellationToken).ConfigureAwait(false);

foreach (var bundle in bundles)
{
Expand Down

0 comments on commit 2817a61

Please sign in to comment.