Introduction
MassTransit abstracts message broker communication (RabbitMQ, Azure Service Bus, Amazon SQS) behind a consistent consumer API. When a consumer is registered but does not receive messages, the issue can stem from incorrect consumer registration, mismatched message types, missing endpoint configuration, serialization failures, or the bus not being started. Unlike simple publish/subscribe, MassTransit requires explicit endpoint configuration, proper exchange/queue bindings, and matching message contracts between publishers and consumers.
Symptoms
- Messages published but consumer
Consumemethod never called - Messages sitting in queue unprocessed
- Consumer registered in DI but not connected to bus
- Published messages go to
_skippedqueue - Consumer throws deserialization exception silently
- Only some message types are consumed while others are not
Debug consumer connection:
``csharp
// Check bus health at startup
var busControl = app.Services.GetRequiredService<IBus>();
var health = await busControl.GetHealth();
Console.WriteLine($"Bus healthy: {health.Healthy}");
if (!health.Healthy)
{
Console.WriteLine($"Issues: {string.Join(", ", health.Issues)}");
}
Common Causes
- Consumer not registered with
AddConsumer<T>()beforeUsingRabbitMqconfig - Message type namespace or assembly mismatch between publisher and consumer
- Exchange/queue not created or bound correctly
- Bus not started (missing
app.MapMassTransit()orStartAsync) - JSON serializer cannot deserialize the message body
- Consumer throws exception and message is moved to error queue immediately
Step-by-Step Fix
- 1.Register consumer and configure bus correctly:
- 2.```csharp
- 3.// Program.cs - complete MassTransit setup
- 4.builder.Services.AddMassTransit(x =>
- 5.{
- 6.// Register ALL consumers before configuring the bus
- 7.x.AddConsumer<OrderCreatedConsumer>();
- 8.x.AddConsumer<OrderCancelledConsumer>();
- 9.x.AddConsumer<PaymentProcessedConsumer>();
x.UsingRabbitMq((context, cfg) => { cfg.Host("rabbitmq://localhost", h => { h.Username("guest"); h.Password("guest"); });
// Configure each consumer endpoint cfg.ReceiveEndpoint("order-created-queue", e => { e.ConfigureConsumer<OrderCreatedConsumer>(context);
// Optional: configure retry e.UseMessageRetry(r => r.Intervals(100, 500, 1000, 2000, 5000));
// Optional: configure circuit breaker e.UseCircuitBreaker(cb => { cb.TrackingPeriod = TimeSpan.FromMinutes(1); cb.TripThreshold = 15; cb.ActiveThreshold = 10; cb.ResetInterval = TimeSpan.FromMinutes(5); }); });
cfg.ReceiveEndpoint("order-cancelled-queue", e => { e.ConfigureConsumer<OrderCancelledConsumer>(context); });
// Or use auto-registration for all consumers cfg.ConfigureEndpoints(context); }); });
// CRITICAL: Start the bus builder.Services.AddMassTransitHostedService(true); ```
- 1.Ensure message contract matches between publisher and consumer:
- 2.```csharp
- 3.// Shared message contract - MUST be in a shared assembly
- 4.// Do NOT define the same interface separately in publisher and consumer
- 5.namespace MyApp.Contracts
- 6.{
- 7.public interface OrderCreated
- 8.{
- 9.Guid OrderId { get; }
- 10.string CustomerEmail { get; }
- 11.decimal TotalAmount { get; }
- 12.List<OrderItem> Items { get; }
- 13.}
public record OrderItem { public string ProductName { get; init; } = string.Empty; public int Quantity { get; init; } public decimal Price { get; init; } } }
// Publisher public class OrderService { private readonly IPublishEndpoint _publishEndpoint;
public async Task CreateOrderAsync(CreateOrderCommand command) { var order = new Order { /* ... */ }; await _publishEndpoint.Publish<OrderCreated>(new { OrderId = order.Id, CustomerEmail = order.Email, TotalAmount = order.Total, Items = order.Items.Select(i => new { i.ProductName, i.Quantity, i.Price }).ToList() }); } }
// Consumer - uses the SAME interface from shared assembly public class OrderCreatedConsumer : IConsumer<OrderCreated> { public async Task Consume(ConsumeContext<OrderCreated> context) { var message = context.Message; Console.WriteLine($"Processing order {message.OrderId} for {message.CustomerEmail}"); // Process the order... } } ```
- 1.Configure JSON serializer to handle message types:
- 2.```csharp
- 3.builder.Services.AddMassTransit(x =>
- 4.{
- 5.x.AddConsumer<OrderCreatedConsumer>();
x.UsingRabbitMq((context, cfg) => { cfg.Host("rabbitmq://localhost");
// Configure serializer cfg.UseRawJsonSerializer(); // For raw JSON without MassTransit envelope
// Or configure with System.Text.Json options cfg.UseRawJsonSerializer();
cfg.ConfigureEndpoints(context); }); });
// If using class-based messages (not interfaces), ensure type names match // MassTransit includes message type in the envelope // Publisher and consumer must agree on the type name
// Check what MassTransit is publishing: public class OrderCreatedConsumer : IConsumer<OrderCreated> { private readonly ILogger<OrderCreatedConsumer> _logger;
public async Task Consume(ConsumeContext<OrderCreated> context) { // Log message headers and type info for debugging _logger.LogInformation("Received message type: {MessageType}", context.MessageType); _logger.LogInformation("Message headers: {Headers}", string.Join(", ", context.Headers.All .Select(h => $"{h.Key}={h.Value}")));
// Check if message is from error queue if (context.ReceiveContext.InputAddress.Path.Contains("_error")) { _logger.LogWarning("Processing from error queue - may be a retry"); } } } ```
- 1.Monitor and troubleshoot consumer connections:
- 2.```csharp
- 3.// Health check endpoint
- 4.app.MapGet("/health/masstransit", async (IBus bus) =>
- 5.{
- 6.var health = await bus.GetHealth();
- 7.return health.Healthy
- 8.? Results.Ok(new { Status = "Healthy" })
- 9.: Results.ServiceUnavailable(new
- 10.{
- 11.Status = "Unhealthy",
- 12.Issues = health.Issues
- 13.});
- 14.});
// Log bus connection events builder.Services.AddMassTransit(x => { x.AddConsumer<OrderCreatedConsumer>();
x.UsingRabbitMq((context, cfg) => { cfg.Host("rabbitmq://localhost");
// Log connection events cfg.ConnectObserver(new ConnectionObserver()); cfg.ReceiveObserver(new ReceiveObserver());
cfg.ConfigureEndpoints(context); }); });
public class ConnectionObserver : IConnectObserver { private readonly ILogger<ConnectionObserver> _logger;
public Task OnConnect(ConnectContext context) { _logger.LogInformation("Connected to broker: {Host}", context.HostAddress); return Task.CompletedTask; }
public Task OnDisconnect(DisconnectContext context) { _logger.LogWarning("Disconnected from broker: {Reason}", context.Reason); return Task.CompletedTask; } }
public class ReceiveObserver : IReceiveObserver { private readonly ILogger<ReceiveObserver> _logger;
public Task PreReceive(ReceiveContext context) { _logger.LogInformation("About to receive message from {InputAddress}", context.InputAddress); return Task.CompletedTask; }
public Task PostReceive(ReceiveContext context) { _logger.LogInformation("Received message: {ContentType}, {MessageId}", context.ContentType, context.MessageId); return Task.CompletedTask; }
public Task PostConsume<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType) where T : class { _logger.LogInformation("Consumed {MessageType} by {ConsumerType} in {Duration}ms", typeof(T).Name, consumerType, duration.TotalMilliseconds); return Task.CompletedTask; }
public Task ConsumeFault<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType, Exception exception) where T : class { _logger.LogError(exception, "Consumer {ConsumerType} failed processing {MessageType}", consumerType, typeof(T).Name); return Task.CompletedTask; } } ```
Prevention
- Put message contracts in a shared assembly referenced by both publisher and consumer
- Use
ConfigureEndpoints(context)for automatic consumer registration - Enable message retry with exponential backoff for transient failures
- Add health checks that verify bus connectivity
- Monitor the error queue for failed messages
- Use correlation IDs to trace messages across services
- Log consumer registration at startup to verify all consumers are connected
- Test with an in-memory test harness before deploying to production:
- ```csharp
- var harness = new InMemoryTestHarness();
- harness.Consumer(() => new OrderCreatedConsumer());
- await harness.Start();
`