A lightweight, high-performance in-memory event bus implementation for .NET 10+ applications that enables decoupled communication through the publish-subscribe pattern. Perfect for implementing domain events, CQRS architectures, and microservice communication patterns within a single application instance.
- Features
- Installation
- Quick Start
- Architecture & Design
- Configuration
- Usage
- Processing Strategies
- API Reference
- Advanced Usage
- Best Practices
- Testing
- Troubleshooting
- Performance
- Migration Guide
- Contributing
- License
- β‘ High Performance: Built on .NET 9's
System.Threading.Channelsfor optimal throughput - π Asynchronous Processing: Non-blocking event publishing and handling
- π‘οΈ Type-Safe: Strongly-typed event contracts with compile-time safety
- π§ Flexible Configuration: Choose between Channel-based or General event processing
- π§΅ Concurrency Control: Built-in semaphore-based concurrency management
- π§± Resilient: Configurable retry policies and error handling
- π¦ Bulk Operations: Support for bulk event publishing
- π Extensive Logging: Comprehensive logging for monitoring and debugging
- π§© Dependency Injection: First-class DI container support
Install the package via NuGet Package Manager:
dotnet add package Softoverse.EventBus.InMemoryOr via Package Manager Console:
Install-Package Softoverse.EventBus.InMemoryCreate events by implementing the IEvent interface:
using Softoverse.EventBus.InMemory.Abstractions;
public class OrderCreatedEvent : IEvent
{
public string OrderNumber { get; set; }
public decimal Amount { get; set; }
public string CustomerId { get; set; }
}
public class PaymentProcessedEvent : IEvent
{
public string PaymentId { get; set; }
public decimal Amount { get; set; }
}Note: You can optionally add an Id property to your events if needed:
public class OrderCreatedEvent : IEvent
{
public Guid Id { get; init; } = Guid.CreateVersion7();
public string OrderNumber { get; set; }
public decimal Amount { get; set; }
public string CustomerId { get; set; }
}Implement event handlers using the IEventHandler<T> interface:
using Softoverse.EventBus.InMemory.Abstractions;
public class OrderCreatedEventHandler : IEventHandler<OrderCreatedEvent>
{
private readonly ILogger<OrderCreatedEventHandler> _logger;
public OrderCreatedEventHandler(ILogger<OrderCreatedEventHandler> logger)
{
_logger = logger;
}
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken cancellationToken = default)
{
_logger.LogInformation("Processing order {OrderNumber} for customer {CustomerId}",
@event.OrderNumber, @event.CustomerId);
// Your business logic here
await ProcessOrderAsync(@event);
}
private async Task ProcessOrderAsync(OrderCreatedEvent orderEvent)
{
// Implementation details...
await Task.Delay(100); // Simulate work
}
}Create an event processor that orchestrates event handling:
using Softoverse.EventBus.InMemory.Abstractions;
public class DefaultEventProcessor : IEventProcessor
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<DefaultEventProcessor> _logger;
public DefaultEventProcessor(
IServiceProvider serviceProvider,
ILogger<DefaultEventProcessor> logger)
{
_serviceProvider = serviceProvider;
_logger = logger;
}
public async Task ProcessEventAsync(IEvent @event, CancellationToken cancellationToken = default)
{
_logger.LogInformation("Processing event {EventType}",
@event.GetType().Name);
await ProcessEventHandlersAsync(@event, cancellationToken);
}
public async Task ProcessEventHandlersAsync(IEvent @event, CancellationToken cancellationToken = default)
{
using var scope = _serviceProvider.CreateScope();
var handlers = scope.ServiceProvider.GetServices<IEventHandler>();
var applicableHandlers = handlers.Where(h => h.CanHandle(@event)).ToList();
if (!applicableHandlers.Any())
{
_logger.LogWarning("No handlers found for event type {EventType}", @event.GetType().Name);
return;
}
var handlerTasks = applicableHandlers.Select(handler =>
SafeHandleAsync(handler, @event, cancellationToken));
await Task.WhenAll(handlerTasks);
}
private async Task SafeHandleAsync(IEventHandler handler, IEvent @event, CancellationToken cancellationToken)
{
try
{
await handler.HandleAsync(@event, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Handler {HandlerType} failed to process event {EventType}",
handler.GetType().Name, @event.GetType().Name);
}
}
}Register the event bus in your Program.cs:
using Softoverse.EventBus.InMemory;
var builder = WebApplication.CreateBuilder(args);
// Register EventBus with assemblies containing your handlers
builder.Services.AddEventBus<DefaultEventProcessor>(
builder.Configuration,
[typeof(Program).Assembly] // Add assemblies containing your event handlers
);
var app = builder.Build();AddEventBus registers IEventBus and IEventProcessor as scoped services. If you need to use them inside singletons or hosted services, resolve them through a scope (for example, via IServiceScopeFactory).
Configure the event bus behavior in your appsettings.json:
{
"EventBusSettings": {
"EventBusType": "Channel",
"MaxConcurrency": 1000,
"ChannelCapacity": -1,
"EventProcessorCapacity": 10,
"ExecuteAfterSeconds": 2,
"RetryAfterSeconds": 5,
"RetryCount": 10,
"EachRetryInterval": 3
}
}| Parameter | Description | Default Value |
|---|---|---|
EventBusType |
Processing strategy: "Channel" or "General" | "Channel" |
MaxConcurrency |
Maximum concurrent operations | 1000 |
ChannelCapacity |
Channel buffer size (-1 = unbounded) | -1 |
EventProcessorCapacity |
Max concurrent event processors | 10 |
ExecuteAfterSeconds |
Delay before initial execution | 2 |
RetryAfterSeconds |
Delay before retry attempts | 5 |
RetryCount |
Maximum retry attempts | 10 |
EachRetryInterval |
Seconds between retry attempts | 3 |
Inject IEventBus into your services and publish events:
public class OrderService
{
private readonly IEventBus _eventBus;
public OrderService(IEventBus eventBus)
{
_eventBus = eventBus;
}
public async Task CreateOrderAsync(CreateOrderRequest request)
{
// Create order logic...
var order = new Order(request);
// Publish single event
var orderCreatedEvent = new OrderCreatedEvent
{
OrderNumber = order.Number,
Amount = order.Amount,
CustomerId = order.CustomerId
};
await _eventBus.PublishAsync(orderCreatedEvent);
// Publish multiple events
var events = new List<OrderCreatedEvent>
{
orderCreatedEvent,
// ... more events
};
await _eventBus.BulkPublishAsync(events);
}
}You can have multiple handlers for the same event:
public class EmailNotificationHandler : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken cancellationToken = default)
{
// Send email notification
}
}
public class InventoryUpdateHandler : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken cancellationToken = default)
{
// Update inventory
}
}
public class AuditLogHandler : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken cancellationToken = default)
{
// Log audit entry
}
}Uses System.Threading.Channels for high-throughput, asynchronous event processing with a background service:
- Pros: High performance, non-blocking publishing, automatic retry handling
- Cons: Events processed asynchronously (eventual consistency)
- Best for: High-volume scenarios, microservices, domain events
{
"EventBusSettings": {
"EventBusType": "Channel"
}
}Processes events immediately and synchronously:
- Pros: Immediate processing, simpler debugging
- Cons: Blocking operations, lower throughput
- Best for: Simple scenarios, debugging, immediate consistency requirements
{
"EventBusSettings": {
"EventBusType": "General"
}
}The EventBus architecture consists of four main components:
βββββββββββββββ ββββββββββββββββ ββββββββββββββββββββββ ββββββββββββββββ
β Publisher βββββββΆβ IEventBus βββββββΆβ IEventProcessor βββββββΆβ IEventHandlerβ
β (Service) β β (Channel/ β β (Process & Route) β β (Multiple) β
βββββββββββββββ β General) β ββββββββββββββββββββββ ββββββββββββββββ
ββββββββββββββββ
β
βΌ
ββββββββββββββββββββββββ
β ChannelEventsHosted β
β Service β
β (Background Worker) β
ββββββββββββββββββββββββ
Publisher β IEventBus.PublishAsync() β Channel.Writer.WriteAsync() β Returns immediatelyThe ChannelEventBus uses an unbounded or bounded channel configured based on ChannelCapacity:
- Unbounded Channel (
ChannelCapacity = -1): No capacity limit, events never lost - Bounded Channel (
ChannelCapacity > 0): Fixed capacity withBoundedChannelFullMode.Waitto prevent event loss
ChannelEventsHostedService β Channel.Reader.ReadAsync() β IEventProcessor.ProcessEventAsync()The ChannelEventsHostedService is a BackgroundService that:
- Continuously reads events from the channel
- Uses a
SemaphoreSlimto control concurrent processing (limit =EventProcessorCapacity) - Processes events through the
IEventProcessorimplementation - Handles errors gracefully without crashing the service
// Unbounded Channel Options
var options = new UnboundedChannelOptions()
{
SingleReader = false, // Multiple consumers for max concurrency
SingleWriter = false, // Multiple publishers can write simultaneously
AllowSynchronousContinuations = true // Better performance
};
// Bounded Channel Options
var options = new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait, // Blocks publisher until space available
SingleReader = false, // Multiple concurrent readers
SingleWriter = false, // Multiple concurrent writers
AllowSynchronousContinuations = true
};Configuration Impact:
SingleReader = false: Enables maximum concurrency but slightly reduces performanceSingleWriter = false: Better for multi-threaded publishing scenariosAllowSynchronousContinuations = true: Continuations run on current thread for better performanceFullMode = Wait: Guarantees no event loss (events will wait until space is available)
The system uses a SemaphoreSlim to limit concurrent event processing:
private readonly SemaphoreSlim semaphore = new(
eventBusSettings.EventProcessorCapacity, // Initial count
eventBusSettings.EventProcessorCapacity // Maximum count
);This prevents overwhelming the system with too many concurrent operations.
The GeneralEventBus processes events synchronously:
Publisher β IEventBus.PublishAsync() β IEventProcessor.ProcessEventAsync() β Handlers β Returns- No channel or background service involved
- Direct processing on the calling thread
- Simpler model but blocks the publisher until all handlers complete
The AddEventBus<TEventProcessor>() extension method automatically:
- Registers
IEventBusandIEventProcessoras Scoped services - Scans provided assemblies for
IEventHandlerimplementations - Registers handlers as Scoped services
- Registers both generic (
IEventHandler<TEvent>) and non-generic (IEventHandler) interfaces - Allows multiple handlers per event type
// Handlers are resolved from DI container per scope
using var scope = _serviceProvider.CreateScope();
var handlers = scope.ServiceProvider.GetServices<IEventHandler>();
var applicableHandlers = handlers.Where(h => h.CanHandle(@event));- Event Published:
IEventBus.PublishAsync(event) - Enqueued (Channel mode): Event written to channel
- Background Worker (Channel mode): Reads from channel
- Semaphore Acquired: Waits if at capacity limit
- Processor Invoked:
IEventProcessor.ProcessEventAsync(event) - Handlers Resolved: DI container provides all applicable handlers
- Parallel Execution: All handlers run concurrently via
Task.WhenAll() - Error Handling: Exceptions caught per handler, don't affect others
- Semaphore Released: Next event can be processed
- Logged: Comprehensive logging at each step
For scenarios requiring a response from event handlers:
TResult result = await eventBus.InvokeAsync<TResult>(event);This method:
- Bypasses the channel (even in Channel mode)
- Directly invokes the processor synchronously
- Returns the result from the first applicable handler
- Used for queries or synchronous operations
- Bounded Channels: Fixed memory footprint based on
ChannelCapacity - Unbounded Channels: Dynamic memory allocation, grows with event volume
- Event Lifetime: Events are garbage collected after processing
- Handler Scoping: Handlers created per scope, disposed automatically
All implementations are thread-safe:
- Channel writers/readers are thread-safe by design
- Semaphore handles concurrent access
- Handler resolution uses scoped DI containers
- No shared mutable state in event bus implementations
The base marker interface for all events in the system.
public interface IEvent;Note: IEvent is a marker interface with no properties. Events can define their own properties as needed.
Events are created by implementing the IEvent interface:
public class UserRegisteredEvent : IEvent
{
public string UserId { get; set; }
public string Email { get; set; }
}Note: Since IEvent is a marker interface, you have full control over the properties and structure of your events. You can add an Id property if your use case requires it:
public class UserRegisteredEvent : IEvent
{
public Guid Id { get; init; } = Guid.CreateVersion7();
public string UserId { get; set; }
public string Email { get; set; }
}The main interface for publishing and invoking events.
public interface IEventBus
{
// Fire-and-forget publish: enqueue background job to process subscribers
ValueTask PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
where TEvent : class, IEvent;
ValueTask BulkPublishAsync<TEvent>(List<TEvent> events, CancellationToken cancellationToken = default)
where TEvent : class, IEvent;
// Wait for the response
ValueTask<TResult> InvokeAsync<TResult>(object @event, CancellationToken cancellationToken = default);
}Methods:
-
PublishAsync(event, cancellationToken)
- Publishes a single event to all registered handlers
- Returns immediately (Channel mode) or after processing (General mode)
- Parameters:
event: The event to publishcancellationToken: Optional cancellation token
- Returns:
ValueTask(completes when event is enqueued or processed)
-
BulkPublishAsync(events, cancellationToken)
- Publishes multiple events of the same type
- Processes sequentially to maintain order
- Parameters:
events: List of events to publishcancellationToken: Optional cancellation token
- Returns:
ValueTask(completes when all events are enqueued or processed)
-
InvokeAsync(event, cancellationToken)
- Synchronously invokes event handlers and waits for a result
- Bypasses the channel queue (even in Channel mode)
- Returns result from the event processor
- Parameters:
event: The event to invoke (of typeobject)cancellationToken: Optional cancellation token
- Returns:
ValueTask<TResult>with the result from handlers - Use Cases: Query operations, request-response patterns, synchronous workflows
The base interface for all event handlers.
public interface IEventHandler
{
bool CanHandle(IEvent @event);
Task HandleAsync(IEvent @event, CancellationToken cancellationToken = default);
}Methods:
CanHandle(event): Determines if this handler can process the given eventHandleAsync(event, cancellationToken): Processes the event
Generic interface for type-safe event handlers.
public interface IEventHandler<in TEvent> : IEventHandler where TEvent : IEvent
{
Task HandleAsync(TEvent @event, CancellationToken cancellationToken = default);
// Bridge implementations (auto-implemented)
bool IEventHandler.CanHandle(IEvent @event) => @event is TEvent;
Task IEventHandler.HandleAsync(IEvent @event, CancellationToken cancellationToken)
=> @event is TEvent typed ? HandleAsync(typed, cancellationToken) : Task.CompletedTask;
}Implementation Example:
public class OrderCreatedHandler : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken cancellationToken = default)
{
// Handle the event
}
}Interface for processing events and coordinating handler execution.
public interface IEventProcessor
{
Task ProcessEventAsync(IEvent @event, CancellationToken cancellationToken = default);
Task ProcessEventHandlersAsync(IEvent @event, CancellationToken cancellationToken = default);
Task<TResult> InvokeAsync<TResult>(object @event, CancellationToken cancellationToken = default);
}Methods:
-
ProcessEventAsync(event, cancellationToken)
- Main entry point for event processing
- Coordinates the overall processing workflow
- Typically calls
ProcessEventHandlersAsyncinternally
-
ProcessEventHandlersAsync(event, cancellationToken)
- Resolves and executes all applicable handlers for the event
- Handles errors per handler without affecting others
- Executes handlers in parallel using
Task.WhenAll()
-
InvokeAsync(event, cancellationToken)
- Processes event and returns a result
- Used for request-response patterns
- Returns result from the first applicable handler
Registers the event bus and all event handlers in the DI container.
public static IServiceCollection AddEventBus<TEventProcessor>(
this IServiceCollection services,
IConfiguration configuration,
params List<Assembly> assemblies)
where TEventProcessor : class, IEventProcessorParameters:
services: The service collectionconfiguration: Application configurationassemblies: Assemblies to scan for event handlers
Returns: The service collection for chaining
Usage:
builder.Services.AddEventBus<DefaultEventProcessor>(
builder.Configuration,
[typeof(Program).Assembly, typeof(OrderHandler).Assembly]
);Retrieves event bus configuration settings.
public static EventBusSettings GetEventBusSettings(this IConfiguration configuration)Returns: EventBusSettings instance populated from configuration
Configuration class for event bus behavior.
public class EventBusSettings
{
public const string SectionName = "EventBusSettings";
public int MaxConcurrency { get; set; } = 1000;
public int ChannelCapacity { get; set; } = -1;
public int EventProcessorCapacity { get; set; } = 10;
public int ExecuteAfterSeconds { get; set; } = 2;
public int RetryAfterSeconds { get; set; } = 5;
public int RetryCount { get; set; } = 10;
public int EachRetryInterval { get; set; } = 3;
public int[] RetryIntervals { get; }
}Properties:
MaxConcurrency: Maximum concurrent operations (default: 1000)ChannelCapacity: Channel buffer size, -1 for unbounded (default: -1)EventProcessorCapacity: Max concurrent event processors (default: 10)ExecuteAfterSeconds: Initial execution delay (default: 2)RetryAfterSeconds: Delay before retry attempts (default: 5)RetryCount: Maximum retry attempts (default: 10)EachRetryInterval: Seconds between retries (default: 3)RetryIntervals: Array of retry intervals based onRetryCountandEachRetryInterval
Create sophisticated event processing logic:
public class RetryableEventProcessor : IEventProcessor
{
private readonly IServiceProvider _serviceProvider;
private readonly EventBusSettings _settings;
private readonly ILogger<RetryableEventProcessor> _logger;
public RetryableEventProcessor(
IServiceProvider serviceProvider,
EventBusSettings settings,
ILogger<RetryableEventProcessor> logger)
{
_serviceProvider = serviceProvider;
_settings = settings;
_logger = logger;
}
public async Task ProcessEventAsync(IEvent @event, CancellationToken cancellationToken = default)
{
var retryIntervals = _settings.RetryIntervals;
for (int attempt = 0; attempt <= _settings.RetryCount; attempt++)
{
try
{
await ProcessEventHandlersAsync(@event, cancellationToken);
return; // Success
}
catch (Exception ex) when (attempt < _settings.RetryCount)
{
_logger.LogWarning(ex, "Attempt {Attempt} failed for event {EventType}. Retrying...",
attempt + 1, @event.GetType().Name);
await Task.Delay(TimeSpan.FromSeconds(retryIntervals[attempt]), cancellationToken);
}
}
}
public async Task ProcessEventHandlersAsync(IEvent @event, CancellationToken cancellationToken = default)
{
// Implementation similar to DefaultEventProcessor
}
}public class ComplexOrderHandler : IEventHandler<OrderCreatedEvent>
{
private readonly IOrderRepository _orderRepository;
private readonly IEmailService _emailService;
private readonly IPaymentService _paymentService;
private readonly ILogger<ComplexOrderHandler> _logger;
public ComplexOrderHandler(
IOrderRepository orderRepository,
IEmailService emailService,
IPaymentService paymentService,
ILogger<ComplexOrderHandler> logger)
{
_orderRepository = orderRepository;
_emailService = emailService;
_paymentService = paymentService;
_logger = logger;
}
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken cancellationToken = default)
{
try
{
// Complex business logic with multiple dependencies
var order = await _orderRepository.GetByNumberAsync(@event.OrderNumber);
await _paymentService.InitializePaymentAsync(order.PaymentInfo);
await _emailService.SendOrderConfirmationAsync(order.CustomerId, order);
_logger.LogInformation("Successfully processed order {OrderNumber}", @event.OrderNumber);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process order {OrderNumber}", @event.OrderNumber);
throw; // Re-throw to trigger retry logic
}
}
}The event bus provides comprehensive logging for monitoring:
// Enable detailed logging in appsettings.json
{
"Logging": {
"LogLevel": {
"Softoverse.EventBus.InMemory": "Information"
}
}
}[Test]
public async Task OrderCreatedEventHandler_Should_ProcessOrder()
{
// Arrange
var logger = Mock.Of<ILogger<OrderCreatedEventHandler>>();
var handler = new OrderCreatedEventHandler(logger);
var orderEvent = new OrderCreatedEvent
{
OrderNumber = "ORDER-001",
Amount = 100.00m,
CustomerId = "CUST-001"
};
// Act
await handler.HandleAsync(orderEvent);
// Assert
// Verify expected behavior
}[Test]
public async Task EventBus_Should_DeliverEvents_ToAllHandlers()
{
// Arrange
var services = new ServiceCollection();
services.AddLogging();
services.AddEventBus<DefaultEventProcessor>(configuration, [typeof(OrderCreatedEventHandler).Assembly]);
var serviceProvider = services.BuildServiceProvider();
var eventBus = serviceProvider.GetRequiredService<IEventBus>();
// Act
var orderEvent = new OrderCreatedEvent { OrderNumber = "ORDER-001" };
await eventBus.PublishAsync(orderEvent);
// Wait for processing (Channel mode)
await Task.Delay(1000);
// Assert
// Verify handlers were called
}Use past-tense verbs to indicate something has happened:
β
Good:
public class OrderCreatedEvent : IEvent { }
public class PaymentProcessedEvent : IEvent { }
public class UserRegisteredEvent : IEvent { }
β Avoid:
public class CreateOrderEvent : IEvent { }
public class ProcessPaymentEvent : IEvent { }
public class RegisterUserEvent : IEvent { }Make events immutable after creation:
public class OrderCreatedEvent : IEvent
{
public string OrderNumber { get; init; } // Use 'init' instead of 'set'
public decimal Amount { get; init; }
public DateTime CreatedAt { get; init; }
public OrderCreatedEvent(string orderNumber, decimal amount)
{
OrderNumber = orderNumber;
Amount = amount;
CreatedAt = DateTime.UtcNow;
}
}Each event should represent a single business occurrence:
β
Good - Separate concerns:
public class OrderCreatedEvent : IEvent { }
public class PaymentInitiatedEvent : IEvent { }
public class InventoryReservedEvent : IEvent { }
β Avoid - Too broad:
public class OrderProcessedEvent : IEvent
{
public Payment Payment { get; set; }
public Inventory Inventory { get; set; }
public Shipping Shipping { get; set; }
}Events should contain all information handlers need:
public class OrderCreatedEvent : IEvent
{
public string OrderNumber { get; init; }
public string CustomerId { get; init; }
public decimal TotalAmount { get; init; }
public List<OrderItem> Items { get; init; }
public Address ShippingAddress { get; init; }
public DateTime CreatedAt { get; init; }
}Each handler should work independently without depending on execution order:
β
Good - Independent handlers:
public class EmailNotificationHandler : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
// Self-contained email logic
await _emailService.SendAsync(@event.CustomerId, "Order Confirmation");
}
}
public class InventoryHandler : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
// Self-contained inventory logic
await _inventoryService.ReserveItems(@event.Items);
}
}Always handle errors to prevent one handler from affecting others:
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
try
{
await ProcessOrderAsync(@event);
}
catch (TransientException ex)
{
_logger.LogWarning(ex, "Transient error, will retry");
throw; // Trigger retry logic
}
catch (Exception ex)
{
_logger.LogError(ex, "Permanent error, logging and continuing");
await LogErrorAsync(ex, @event);
// Don't throw - allow other handlers to process
}
}Handlers should be safe to execute multiple times:
public class PaymentProcessingHandler : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
// Check if already processed
if (await _paymentService.IsProcessedAsync(@event.OrderNumber))
{
_logger.LogInformation("Payment already processed for {OrderNumber}", @event.OrderNumber);
return;
}
// Process payment
await _paymentService.ProcessAsync(@event);
// Mark as processed
await _paymentService.MarkAsProcessedAsync(@event.OrderNumber);
}
}Handlers are scoped, so inject scoped or transient services:
public class OrderHandler : IEventHandler<OrderCreatedEvent>
{
private readonly IOrderRepository _repository; // Scoped
private readonly IDbContext _dbContext; // Scoped
public OrderHandler(IOrderRepository repository, IDbContext dbContext)
{
_repository = repository;
_dbContext = dbContext;
}
}public class RobustEventProcessor : IEventProcessor
{
public async Task ProcessEventAsync(IEvent @event, CancellationToken ct)
{
var retryCount = 0;
var maxRetries = 3;
while (retryCount <= maxRetries)
{
try
{
await ProcessEventHandlersAsync(@event, ct);
return;
}
catch (Exception ex) when (retryCount < maxRetries && IsTransient(ex))
{
retryCount++;
var delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount)); // Exponential backoff
_logger.LogWarning("Retry {Retry}/{Max} after {Delay}s", retryCount, maxRetries, delay.TotalSeconds);
await Task.Delay(delay, ct);
}
}
}
private bool IsTransient(Exception ex) =>
ex is TimeoutException or HttpRequestException or DbUpdateException;
}public class CircuitBreakerEventProcessor : IEventProcessor
{
private readonly Polly.CircuitBreakerPolicy _circuitBreaker;
public CircuitBreakerEventProcessor()
{
_circuitBreaker = Policy
.Handle<Exception>()
.CircuitBreakerAsync(
exceptionsAllowedBeforeBreaking: 5,
durationOfBreak: TimeSpan.FromMinutes(1)
);
}
public async Task ProcessEventAsync(IEvent @event, CancellationToken ct)
{
await _circuitBreaker.ExecuteAsync(async () =>
{
await ProcessEventHandlersAsync(@event, ct);
});
}
}// appsettings.Development.json
{
"EventBusSettings": {
"EventBusType": "General", // Simpler debugging
"EventProcessorCapacity": 1
}
}
// appsettings.Production.json
{
"EventBusSettings": {
"EventBusType": "Channel", // High performance
"EventProcessorCapacity": 50,
"ChannelCapacity": 10000
}
}- High-volume, bursty traffic: Use unbounded channels (
ChannelCapacity: -1) - Memory-constrained: Use bounded channels with appropriate capacity
- CPU-bound handlers: Lower
EventProcessorCapacity(e.g., CPU core count) - I/O-bound handlers: Higher
EventProcessorCapacity(e.g., 50-100)
public class MetricsEventProcessor : IEventProcessor
{
private readonly IMetricsCollector _metrics;
public async Task ProcessEventAsync(IEvent @event, CancellationToken ct)
{
var stopwatch = Stopwatch.StartNew();
try
{
await ProcessEventHandlersAsync(@event, ct);
_metrics.RecordSuccess(@event.GetType().Name, stopwatch.ElapsedMilliseconds);
}
catch (Exception ex)
{
_metrics.RecordFailure(@event.GetType().Name, stopwatch.ElapsedMilliseconds);
throw;
}
}
}[Fact]
public async Task Handler_Should_ProcessEvent_Successfully()
{
// Arrange
var mockService = new Mock<IOrderService>();
var handler = new OrderCreatedHandler(mockService.Object);
var @event = new OrderCreatedEvent { OrderNumber = "123" };
// Act
await handler.HandleAsync(@event);
// Assert
mockService.Verify(s => s.ProcessOrderAsync("123"), Times.Once);
}[Fact]
public async Task Processor_Should_CallAllHandlers()
{
// Arrange
var services = new ServiceCollection();
services.AddScoped<IEventHandler, Handler1>();
services.AddScoped<IEventHandler, Handler2>();
var processor = new DefaultEventProcessor(services.BuildServiceProvider());
// Act
await processor.ProcessEventAsync(new TestEvent());
// Assert - verify both handlers called
}[Fact]
public async Task EventBus_Integration_Test()
{
var host = Host.CreateDefaultBuilder()
.ConfigureServices((context, services) =>
{
services.AddEventBus<DefaultEventProcessor>(
context.Configuration,
[typeof(TestHandler).Assembly]
);
})
.Build();
await host.StartAsync();
var eventBus = host.Services.GetRequiredService<IEventBus>();
await eventBus.PublishAsync(new TestEvent());
await Task.Delay(100); // Wait for processing
// Assert expectations
}Symptom: Events are published but handlers never execute.
Possible Causes & Solutions:
a) Handler Not Registered
// β Problem: Assembly not included in registration
builder.Services.AddEventBus<DefaultEventProcessor>(
builder.Configuration,
[] // Empty list!
);
// β
Solution: Include assemblies with handlers
builder.Services.AddEventBus<DefaultEventProcessor>(
builder.Configuration,
[typeof(Program).Assembly, typeof(MyHandler).Assembly]
);b) Handler Doesn't Implement IEventHandler
// β Problem: Missing interface implementation
public class MyHandler
{
public Task HandleAsync(MyEvent @event) { }
}
// β
Solution: Implement IEventHandler<T>
public class MyHandler : IEventHandler<MyEvent>
{
public async Task HandleAsync(MyEvent @event, CancellationToken ct) { }
}c) Channel Mode - Background Service Not Started
// β
Ensure app is running to start background services
var app = builder.Build();
await app.RunAsync(); // This starts ChannelEventsHostedServiceSymptom: Log shows "Ignored null event of type {EventType}"
Cause: Bug in null check logic (checks @event != null! which is inverted)
Current Implementation:
if (@event != null!) // This checks if event is NOT null (bug)
{
_logger.LogWarning("Ignored null event");
return;
}Workaround: Always ensure events are not null before publishing:
if (@event != null)
{
await _eventBus.PublishAsync(@event);
}Symptom: One handler's exception prevents other handlers from executing.
Solution: Ensure processor catches exceptions per handler:
public async Task ProcessEventHandlersAsync(IEvent @event, CancellationToken ct)
{
var handlers = GetHandlers(@event);
// β
Good: Each handler wrapped in try-catch
var tasks = handlers.Select(h => SafeHandleAsync(h, @event, ct));
await Task.WhenAll(tasks);
}
private async Task SafeHandleAsync(IEventHandler handler, IEvent @event, CancellationToken ct)
{
try
{
await handler.HandleAsync(@event, ct);
}
catch (Exception ex)
{
_logger.LogError(ex, "Handler {Handler} failed", handler.GetType().Name);
// Don't rethrow - let other handlers run
}
}Symptom: Application hangs when publishing events.
Cause: Bounded channel is full and using FullMode.Wait.
Solutions:
a) Increase Channel Capacity
{
"EventBusSettings": {
"ChannelCapacity": 50000 // Increase from default
}
}b) Use Unbounded Channel
{
"EventBusSettings": {
"ChannelCapacity": -1 // Unbounded
}
}c) Increase Processor Capacity
{
"EventBusSettings": {
"EventProcessorCapacity": 50 // More concurrent processors
}
}Symptom: High memory usage, possible OutOfMemoryException.
Cause: Unbounded channel accumulating events faster than they're processed.
Solutions:
a) Switch to Bounded Channel
{
"EventBusSettings": {
"ChannelCapacity": 10000
}
}b) Increase Processing Speed
- Increase
EventProcessorCapacity - Optimize handler performance
- Add more application instances
Symptom: Handler is registered but CanHandle() returns false.
Cause: Type mismatch between published event and handler generic type.
// β Problem: Type mismatch
await _eventBus.PublishAsync(new OrderCreated()); // Different type
public class Handler : IEventHandler<OrderCreatedEvent> { }
// β
Solution: Use exact same type
await _eventBus.PublishAsync(new OrderCreatedEvent());
public class Handler : IEventHandler<OrderCreatedEvent> { }Symptom: Events take too long to process.
Diagnosis:
public class DiagnosticEventProcessor : IEventProcessor
{
public async Task ProcessEventAsync(IEvent @event, CancellationToken ct)
{
var sw = Stopwatch.StartNew();
await ProcessEventHandlersAsync(@event, ct);
_logger.LogInformation("Event {Type} processed in {Ms}ms",
@event.GetType().Name, sw.ElapsedMilliseconds);
}
}Solutions:
- Profile handler code to find bottlenecks
- Use async I/O operations, not blocking calls
- Consider breaking into smaller events
- Increase
EventProcessorCapacityfor I/O-bound work
Symptom: Unable to resolve service exception in handlers.
Causes & Solutions:
a) Service Not Registered
// β Problem: Service not registered
public class MyHandler : IEventHandler<MyEvent>
{
public MyHandler(IMyService service) { } // Service not in DI
}
// β
Solution: Register dependencies
builder.Services.AddScoped<IMyService, MyService>();b) Singleton Depending on Scoped
// Problem: Singleton can't depend on scoped
public class EventPublisherHostedService : BackgroundService
{
// IEventBus is scoped, so this is invalid
public EventPublisherHostedService(IEventBus eventBus) { }
}
// Solution: Use IServiceScopeFactory and create scopes
public class EventPublisherHostedService : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using var scope = _scopeFactory.CreateScope();
var eventBus = scope.ServiceProvider.GetRequiredService<IEventBus>();
await eventBus.PublishAsync(new MyEvent(), stoppingToken);
}
}Symptom: Settings always use default values.
Check:
// 1. Verify appsettings.json section name
{
"EventBusSettings": { // Must match EventBusSettings.SectionName
"ChannelCapacity": 5000
}
}
// 2. Verify configuration is passed
builder.Services.AddEventBus<DefaultEventProcessor>(
builder.Configuration, // Make sure this is provided
[typeof(Program).Assembly]
);
// 3. Check configuration provider order
var config = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: false)
.AddJsonFile($"appsettings.{env}.json", optional: true)
.AddEnvironmentVariables()
.Build();{
"Logging": {
"LogLevel": {
"Default": "Information",
"Softoverse.EventBus.InMemory": "Debug",
"Softoverse.EventBus.InMemory.Infrastructure.Channels": "Trace"
}
}
}public class DiagnosticEventProcessor : IEventProcessor
{
private readonly IEventProcessor _innerProcessor;
private readonly ILogger _logger;
public async Task ProcessEventAsync(IEvent @event, CancellationToken ct)
{
_logger.LogInformation("π¨ Event received: {Type}",
@event.GetType().Name);
var sw = Stopwatch.StartNew();
try
{
await _innerProcessor.ProcessEventAsync(@event, ct);
_logger.LogInformation("β
Event processed in {Ms}ms", sw.ElapsedMilliseconds);
}
catch (Exception ex)
{
_logger.LogError(ex, "β Event processing failed after {Ms}ms", sw.ElapsedMilliseconds);
throw;
}
}
}When debugging, switch to General mode for simpler synchronous execution:
{
"EventBusSettings": {
"EventBusType": "General" // Easier to debug
}
}Q: Can I use this library across multiple processes?
A: No, this is an in-memory implementation for single-process use. For distributed scenarios, consider message brokers like RabbitMQ, Azure Service Bus, or Apache Kafka.
Q: Are events guaranteed to be processed in order?
A: In Channel mode with SingleReader = false, order is not guaranteed due to parallel processing. For ordered processing, use General mode or implement ordering in your handlers.
Q: Can I have multiple event buses in one application?
A: Yes, but you'll need to register them manually with different lifetimes or use named registrations.
Q: How do I handle long-running operations in handlers?
A: Consider:
- Increase
EventProcessorCapacityfor I/O-bound operations - Use background jobs (Hangfire, Quartz) for truly long-running work
- Break into smaller events and use saga patterns
Q: Can handlers publish new events?
A: Yes! Just inject IEventBus into your handler and publish:
public class OrderHandler : IEventHandler<OrderCreatedEvent>
{
private readonly IEventBus _eventBus;
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
// Process order...
await _eventBus.PublishAsync(new OrderProcessedEvent());
}
}Q: What happens if the application crashes?
A: Events in the channel are lost. For durability, consider:
- Persisting events before publishing
- Using durable message brokers
- Implementing outbox pattern
- Throughput: Handles thousands of events per second in Channel mode
- Memory Usage: Efficient memory management with bounded channels
- Latency: Sub-millisecond event publishing, configurable processing delays
- Scalability: Horizontal scaling through concurrency controls
Based on typical workloads:
| Scenario | Events/sec | Latency (p50) | Latency (p99) | Memory |
|---|---|---|---|---|
| Channel - Unbounded | 50,000+ | <1ms publish | <5ms | ~100MB for 10K events |
| Channel - Bounded (10K) | 40,000+ | <1ms publish | <10ms | Fixed ~50MB |
| General - Sync | 5,000+ | 2-5ms | 20-50ms | Minimal |
Factors affecting performance:
- Handler complexity and execution time
- Number of concurrent processors (
EventProcessorCapacity) - Channel configuration (bounded vs unbounded)
- Number of handlers per event
- Hardware (CPU cores, memory)
MediatR is focused on in-process messaging with CQRS patterns, while Softoverse.EventBus is optimized for event-driven architectures.
Key Differences:
| Feature | MediatR | Softoverse.EventBus |
|---|---|---|
| Processing | Synchronous pipeline | Async channel-based or direct |
| Multiple Handlers | Sequential via Pipeline | Parallel execution |
| Background Processing | Manual (BackgroundService) | Built-in (Channel mode) |
| Notifications | INotification / INotificationHandler |
IEvent / IEventHandler<T> |
Migration Steps:
- Replace Notification Interfaces:
// MediatR
public class OrderCreated : INotification
{
public string OrderId { get; set; }
}
public class Handler : INotificationHandler<OrderCreated>
{
public async Task Handle(OrderCreated notification, CancellationToken ct)
{
// Handle
}
}
// Softoverse.EventBus
public class OrderCreatedEvent : EventBase
{
public string OrderId { get; init; }
}
public class Handler : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
// Handle
}
}- Replace Publisher:
// MediatR
await _mediator.Publish(new OrderCreated { OrderId = "123" });
// Softoverse.EventBus
await _eventBus.PublishAsync(new OrderCreatedEvent { OrderId = "123" });- Update DI Registration:
// MediatR
services.AddMediatR(cfg => cfg.RegisterServicesFromAssembly(typeof(Program).Assembly));
// Softoverse.EventBus
services.AddEventBus<DefaultEventProcessor>(
configuration,
[typeof(Program).Assembly]
);MassTransit is a distributed application framework, while Softoverse.EventBus is simpler and focused on in-memory scenarios.
Migration Steps:
- Simplify Event Definitions:
// MassTransit
public class OrderCreated
{
public Guid CorrelationId { get; set; }
public string OrderId { get; set; }
}
// Softoverse.EventBus
public class OrderCreatedEvent : EventBase
{
public string OrderId { get; init; }
// Id property inherited from EventBase
}- Replace Consumers with Handlers:
// MassTransit
public class OrderCreatedConsumer : IConsumer<OrderCreated>
{
public async Task Consume(ConsumeContext<OrderCreated> context)
{
var message = context.Message;
// Handle
}
}
// Softoverse.EventBus
public class OrderCreatedHandler : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
// Handle
}
}- Simplify Configuration:
// MassTransit
services.AddMassTransit(x =>
{
x.AddConsumer<OrderCreatedConsumer>();
x.UsingInMemory((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
// Softoverse.EventBus
services.AddEventBus<DefaultEventProcessor>(
configuration,
[typeof(Program).Assembly]
);If you have a custom event aggregator pattern:
Before:
public interface IEventAggregator
{
void Subscribe<T>(Action<T> handler);
void Publish<T>(T @event);
}After:
// Define events
public class MyEvent : EventBase
{
public string Data { get; init; }
}
// Create handler
public class MyEventHandler : IEventHandler<MyEvent>
{
public async Task HandleAsync(MyEvent @event, CancellationToken ct)
{
// Handle event
}
}
// Publish
await _eventBus.PublishAsync(new MyEvent { Data = "test" });Breaking Changes:
- Target framework changed from .NET 9 to .NET 10
- Updated dependencies to Microsoft.Extensions.* version 10.0.2
Migration Steps:
- Update Target Framework:
<!-- Before -->
<TargetFramework>net9.0</TargetFramework>
<!-- After -->
<TargetFramework>net10.0</TargetFramework>- Update Package Reference:
<PackageReference Include="Softoverse.EventBus.InMemory" Version="10.0.0" />- Test Your Application:
- No API changes required
- Configuration remains compatible
- Test all event handlers
| EventBus Version | .NET Version | Status |
|---|---|---|
| 10.x | .NET 10+ | Current |
| 1.x | .NET 9+ | Legacy |
| Feature | v1.x | v10.x |
|---|---|---|
| Channel Processing | β | β |
| General Processing | β | β |
| InvokeAsync | β | β |
| BulkPublishAsync | β | β |
| Configurable Settings | β | β |
| .NET 10 Support | β | β |
// Aggregate publishes domain events
public class Order : AggregateRoot
{
public void Create(string customerId, List<OrderItem> items)
{
// Business logic
var @event = new OrderCreatedEvent
{
OrderId = Id,
CustomerId = customerId,
Items = items
};
AddDomainEvent(@event);
}
}
// Event handlers for side effects
public class EmailNotificationHandler : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
await _emailService.SendOrderConfirmation(@event.CustomerId);
}
}public class CreateOrderCommandHandler : IRequestHandler<CreateOrderCommand>
{
private readonly IEventBus _eventBus;
public async Task Handle(CreateOrderCommand command, CancellationToken ct)
{
// Execute command
var order = new Order(command.CustomerId, command.Items);
await _repository.SaveAsync(order);
// Publish event
await _eventBus.PublishAsync(new OrderCreatedEvent
{
OrderId = order.Id,
CustomerId = order.CustomerId
});
}
}// Order Service
await _eventBus.PublishAsync(new OrderCreatedEvent { OrderId = orderId });
// Inventory Service Handler (same process)
public class InventoryHandler : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
await _inventoryService.ReserveItems(@event.Items);
}
}
// Shipping Service Handler (same process)
public class ShippingHandler : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
await _shippingService.CreateShipment(@event.OrderId);
}
}// Trigger background processing
await _eventBus.PublishAsync(new DataImportRequestedEvent
{
FileUrl = fileUrl,
BatchSize = 1000
});
// Handler processes asynchronously
public class DataImportHandler : IEventHandler<DataImportRequestedEvent>
{
public async Task HandleAsync(DataImportRequestedEvent @event, CancellationToken ct)
{
await _importService.ImportFromUrlAsync(@event.FileUrl, @event.BatchSize);
// Publish completion event
await _eventBus.PublishAsync(new DataImportCompletedEvent
{
FileUrl = @event.FileUrl,
RecordsProcessed = result.Count
});
}
}public class EventStore
{
private readonly IEventBus _eventBus;
private readonly List<IEvent> _events = new();
public async Task AppendAsync(IEvent @event)
{
_events.Add(@event);
await _eventBus.PublishAsync(@event);
}
public IEnumerable<IEvent> GetEvents()
{
return _events;
}
}- MediatR: For CQRS and request/response patterns
- MassTransit: For distributed messaging
- NServiceBus: For enterprise messaging
- Rebus: For service bus implementations
Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
-
Fork the Repository
git clone https://github.com/mahmudabir/EventBus.git cd EventBus -
Create a Feature Branch
git checkout -b feature/your-feature-name
-
Make Your Changes
- Follow existing code style and conventions
- Add tests for new features
- Update documentation as needed
- Ensure all tests pass
-
Commit Your Changes
git commit -m "Add: Brief description of your changes" -
Push and Create Pull Request
git push origin feature/your-feature-name
- Code Style: Follow C# coding conventions and use meaningful names
- Testing: Add unit tests for new features and bug fixes
- Documentation: Update README.md and XML documentation comments
- Commit Messages: Use clear, descriptive commit messages
- Pull Requests: Provide a clear description of changes and reasoning
- π Bug fixes and issue resolutions
- β¨ New features and enhancements
- π Documentation improvements
- π§ͺ Additional test coverage
- β‘ Performance optimizations
- π Examples and tutorials
# Clone the repository
git clone https://github.com/mahmudabir/EventBus.git
cd EventBus
# Restore dependencies
dotnet restore
# Build the project
dotnet build
# Run tests (if available)
dotnet test
# Create NuGet package
dotnet pack -c ReleaseThis project is licensed under the Apache License 2.0 - see the LICENSE file for details.
β Permissions:
- Commercial use
- Modification
- Distribution
- Patent use
- Private use
- License and copyright notice
- State changes
β Limitations:
- Liability
- Warranty
For the full license text, see LICENSE or visit https://www.apache.org/licenses/LICENSE-2.0
π― Major Release - .NET 10 Support
Breaking Changes:
- β¬οΈ Target framework upgraded from .NET 9.0 to .NET 10.0
- π¦ Updated Microsoft.Extensions.* dependencies to version 10.0.2
Microsoft.Extensions.Configuration.Binderβ 10.0.2Microsoft.Extensions.Hosting.Abstractionsβ 10.0.2Microsoft.Extensions.Logging.Abstractionsβ 10.0.2
Improvements:
- π Enhanced performance with .NET 10 optimizations
- π Comprehensive documentation with architecture details
- π§ Improved troubleshooting guide with common issues and solutions
- π‘ Added best practices and design patterns section
- π§ͺ Enhanced testing examples
- π Added migration guide from other event bus libraries
- π Performance benchmarks and characteristics documented
Notes:
- No API changes - fully compatible with existing code after framework upgrade
- Configuration remains backward compatible
- All features from v1.x are preserved
Migration:
<!-- Update your project file -->
<TargetFramework>net10.0</TargetFramework>
<PackageReference Include="Softoverse.EventBus.InMemory" Version="10.0.0" />π First Public Release
Core Features:
- β In-memory event bus implementation
- π Two processing strategies:
- Channel-based (high-performance, async)
- General (simple, synchronous)
- π‘ Publish-subscribe pattern support
- π― Type-safe event handling with
IEventHandler<TEvent> - π§ Comprehensive dependency injection integration
- βοΈ Configurable settings via
appsettings.json - π Extensive logging support
- π Retry policy configuration
- π¦ Bulk event publishing support
- π Request-response pattern with
InvokeAsync<TResult>
Components:
IEvent/EventBase- Event contractsIEventBus- Event publishing interfaceIEventHandler<T>- Type-safe event handlersIEventProcessor- Event processing coordinationChannelEventBus- Channel-based implementationGeneralEventBus- Direct processing implementationChannelEventsHostedService- Background processing serviceEventBusSettings- Configuration model
Configuration Options:
EventBusType- Choose processing strategyMaxConcurrency- Concurrent operation limitsChannelCapacity- Buffer size configurationEventProcessorCapacity- Concurrent processor limitsRetryCount/RetryAfterSeconds- Retry policy settings
Target Framework:
- .NET 9.0
Special thanks to:
- The .NET team for the excellent
System.Threading.Channelslibrary - The open-source community for inspiration and feedback
- All contributors who help improve this project
- π Documentation: Start with this README and the Quick Start guide
- π Bug Reports: Create an issue
- π‘ Feature Requests: Request a feature
- π¬ Questions: GitHub Discussions
For enterprise support, training, or consulting services, please contact the maintainers through GitHub.
If you find this project useful, please consider:
- β Starring the repository
- π Reporting bugs
- π‘ Suggesting new features
- π Improving documentation
- π Contributing code
Made with β€οΈ by mahmudabir