RabbitMQ Transport
Configure the RabbitMQ transport in Mocha for production messaging with automatic topology provisioning, connection management, and prefetch tuning.
RabbitMQ transport
The RabbitMQ transport connects Mocha to a RabbitMQ broker for production messaging. It manages connections, provisions exchanges and queues automatically, handles message acknowledgement, and supports request/reply with dedicated reply endpoints. When you need durable, distributed messaging across multiple services, this is the transport to use.
Set up the RabbitMQ transport
By the end of this section, you will have a Mocha bus connected to RabbitMQ with automatic topology provisioning.
Install the package
dotnet add package Mocha.Transport.RabbitMQ
Register with .NET Aspire
The most common setup uses the Aspire RabbitMQ component for connection management:
dotnet add package Aspire.RabbitMQ.Client
using Mocha;using Mocha.Transport.RabbitMQ;
var builder = WebApplication.CreateBuilder(args);
// Aspire registers IConnectionFactory from the "rabbitmq" connection resourcebuilder.AddRabbitMQClient("rabbitmq");
// Register the message bus with RabbitMQ transportbuilder.Services .AddMessageBus() .AddEventHandler<OrderPlacedEventHandler>() .AddRabbitMQ();
var app = builder.Build();app.Run();
The Aspire component reads the connection string from configuration (typically ConnectionStrings:rabbitmq), handles health checks, and integrates with the Aspire dashboard for observability.
.AddRabbitMQ() picks up the IConnectionFactory from DI (registered by Aspire) and uses it to establish connections to the broker. Default conventions automatically create exchanges, queues, and bindings for your registered handlers.
Register with a manual connection string
If you are not using Aspire, register the IConnectionFactory directly:
using Mocha;using Mocha.Transport.RabbitMQ;using RabbitMQ.Client;
var builder = WebApplication.CreateBuilder(args);
// Register IConnectionFactory manuallybuilder.Services.AddSingleton<IConnectionFactory>(_ => new ConnectionFactory { HostName = "localhost", Port = 5672, VirtualHost = "/", UserName = "guest", Password = "guest" });
builder.Services .AddMessageBus() .AddEventHandler<OrderPlacedEventHandler>() .AddRabbitMQ();
var app = builder.Build();app.Run();
To use a connection string from configuration:
builder.Services.AddSingleton<IConnectionFactory>(_ =>{ var factory = new ConnectionFactory(); factory.Uri = new Uri(builder.Configuration.GetConnectionString("rabbitmq")!); return factory;});
Use a custom connection provider
For full control over connection lifecycle, provide a custom IRabbitMQConnectionProvider:
builder.Services .AddMessageBus() .AddRabbitMQ(transport => { transport.ConnectionProvider(sp => { return sp.GetRequiredService<MyCustomConnectionProvider>(); }); });
The IRabbitMQConnectionProvider interface exposes Host, Port, VirtualHost, and a CreateAsync method. When no custom provider is registered, the transport falls back to resolving IConnectionFactory from DI and wrapping it in a default provider.
Verify it works
Add an endpoint that publishes through the bus and verify the handler executes:
app.MapPost("/orders", async (IMessageBus bus) =>{ await bus.PublishAsync(new OrderPlacedEvent { OrderId = Guid.NewGuid(), CustomerId = "customer-1", TotalAmount = 99.99m }, CancellationToken.None);
return Results.Ok();});
Send a POST request to /orders and check your application logs. You should see the handler process the event. You can also inspect the RabbitMQ management UI at http://localhost:15672 to see the auto-provisioned exchanges and queues.
Two connections per broker transport
Mocha opens two connections to the broker: one for consuming and one for dispatching.
This design prevents back-pressure from slow consumers from blocking outbound message publishing. When a consumer processes messages slowly, the RabbitMQ client applies back-pressure on that connection. Without separation, a slow consumer could prevent your application from publishing new messages entirely. With separate connections, each direction operates independently.
How topology works
When the transport starts, it provisions topology on the broker automatically. Here is how message types map to RabbitMQ resources:
graph LR
P[Publisher] -->|publish| E[Exchange<br/>order-placed-event]
E -->|binding| Q[Queue<br/>billing-service]
Q -->|consume| C[Consumer]Events (publish/subscribe): Each event type gets a fanout exchange. Each service that subscribes creates a queue bound to that exchange. Publishing sends the message to the exchange, which fans it out to all bound queues.
Commands (send): Each command type gets a direct exchange bound to a single queue. Sending delivers the message to exactly one consumer.
Request/reply: The transport creates a temporary reply queue per service instance. The reply address is embedded in the request message so the responder knows where to send the reply.
:::warning Message loss warning. Messages published before the transport completes its Start phase may be lost if no queue is bound to the exchange yet. During deployment, ensure consuming services start before publishing services, or use publisher confirms to detect lost messages.
If a message is published to an exchange with no bound queue - for example, when no consumer has started - that message is dropped. Mocha auto-provisions topology, but the window between exchange creation and queue binding is a real operational risk. :::
Publisher confirms
Mocha's RabbitMQ transport uses publisher confirms on dispatch, which means the broker acknowledges each published message before the publish call completes. This provides at-least-once delivery guarantees for outbound messages: if the broker does not confirm, the publish fails with an exception. See the RabbitMQ Reliability Guide for a full treatment of delivery guarantees.
Default topology for event handlers
When you register an event handler with AddEventHandler<T>(), the RabbitMQ transport creates this topology:
A fanout exchange named after the message type fans out to per-service exchanges, which bind to per-service queues. This allows multiple services to each receive a copy of every published event.
Default topology for send handlers
When you register a request handler with AddRequestHandler<T>() for send (fire-and-forget), the transport creates a single queue:
Send messages go to a dedicated queue. Only one handler processes each message - this is the point-to-point guarantee.
Configure transport-level defaults
You can set defaults that apply to all auto-provisioned queues and exchanges. This is useful when you want consistent settings across all resources without configuring each one individually.
Use ConfigureDefaults to set queue and exchange defaults:
builder.Services .AddMessageBus() .AddRabbitMQ(transport => { transport.ConfigureDefaults(defaults => { // All queues will be quorum with a delivery limit of 5 defaults.Queue.QueueType = RabbitMQQueueType.Quorum; defaults.Queue.Arguments["x-delivery-limit"] = 5;
// All exchanges will use topic routing defaults.Exchange.Type = RabbitMQExchangeType.Topic; }); });
For example, to enable quorum queues with a specific initial group size:
builder.Services .AddMessageBus() .AddRabbitMQ(transport => { transport.ConfigureDefaults(defaults => { defaults.Queue.QueueType = RabbitMQQueueType.Quorum; defaults.Queue.Arguments["x-quorum-initial-group-size"] = 3; }); });
Or to use stream queues for append-only log semantics:
builder.Services .AddMessageBus() .AddRabbitMQ(transport => { transport.ConfigureDefaults(defaults => { defaults.Queue.QueueType = RabbitMQQueueType.Stream; }); });
Available queue defaults:
| Property | Type | Description |
|---|---|---|
QueueType | string | Queue type: RabbitMQQueueType.Classic, .Quorum, or .Stream |
Durable | bool? | Whether queues survive broker restarts (default: true) |
AutoDelete | bool? | Whether queues are auto-deleted when unused (default: false) |
Arguments | Dictionary<string, object> | Additional arguments (e.g., x-delivery-limit, x-max-priority) |
Available exchange defaults:
| Property | Type | Description |
|---|---|---|
Type | string | Exchange type: RabbitMQExchangeType.Fanout, .Direct, .Topic, or .Headers |
Durable | bool? | Whether exchanges survive broker restarts (default: true) |
AutoDelete | bool? | Whether exchanges are auto-deleted when unused (default: false) |
Arguments | Dictionary<string, object> | Additional arguments (e.g., alternate-exchange) |
Defaults never override explicitly configured values. If you declare a queue with a specific queue type, that setting takes precedence over the transport default. You can call ConfigureDefaults multiple times - each call accumulates settings on the same defaults object.
Declare custom topology
Mocha auto-provisions topology by default. To declare additional exchanges, queues, or bindings:
builder.Services .AddMessageBus() .AddRabbitMQ(transport => { // Declare an exchange transport.DeclareExchange("order-events") .Type(RabbitMQExchangeType.Fanout) .Durable() .AutoProvision();
// Declare a queue (use quorum type for production) transport.DeclareQueue("billing-orders") .Durable() .AutoProvision() .WithArgument("x-queue-type", "quorum");
// Bind the exchange to the queue transport.DeclareBinding("order-events", "billing-orders") .AutoProvision(); });
All explicitly declared topology is provisioned when the transport starts, before receive endpoints begin consuming.
Control auto-provisioning
By default, the transport auto-provisions all topology resources (exchanges, queues, bindings) on the broker at startup. In production environments where infrastructure is managed externally - for example by Terraform, Ansible, or the RabbitMQ Messaging Topology Operator on Kubernetes - you can disable auto-provisioning so the transport expects resources to already exist.
Disable globally
Turn off auto-provisioning for the entire transport:
builder.Services .AddMessageBus() .AddEventHandler<OrderPlacedEventHandler>() .AddRabbitMQ(transport => { transport.AutoProvision(false); });
With auto-provisioning disabled, the transport will not create any exchanges, queues, or bindings. All resources must already exist on the broker before the transport starts.
Override per resource
Individual resources can override the transport-level setting. This is useful when most topology is managed externally but a few resources need to be created dynamically:
builder.Services .AddMessageBus() .AddRabbitMQ(transport => { // Disable globally transport.AutoProvision(false);
// This exchange already exists on the broker - skip provisioning transport.DeclareExchange("order-events");
// This queue should be created by the transport transport.DeclareQueue("billing-orders") .AutoProvision(true);
// This binding should also be created transport.DeclareBinding("order-events", "billing-orders") .AutoProvision(true); });
The effective auto-provision value for each resource follows a cascading pattern:
| Resource setting | Transport setting | Result |
|---|---|---|
true | any | Provisioned |
false | any | Not provisioned |
| not set | true (default) | Provisioned |
| not set | false | Not provisioned |
When a resource does not specify AutoProvision, it inherits the transport-level default. When the transport does not specify AutoProvision, it defaults to true.
Common patterns
Fully managed infrastructure: Disable auto-provisioning globally and declare all resources without AutoProvision. The transport will use existing broker resources without attempting to create them.
transport.AutoProvision(false);transport.DeclareExchange("order-events");transport.DeclareQueue("billing-orders");transport.DeclareBinding("order-events", "billing-orders");
Selective provisioning: Disable globally but enable for specific resources that are owned by this service.
transport.AutoProvision(false);transport.DeclareExchange("shared-events"); // managed externallytransport.DeclareQueue("my-service-queue") .AutoProvision(true); // owned by this servicetransport.DeclareBinding("shared-events", "my-service-queue") .AutoProvision(true); // owned by this service
Kubernetes with the Messaging Topology Operator: When the RabbitMQ Messaging Topology Operator manages your exchanges, queues, and bindings as Kubernetes custom resources, disable auto-provisioning entirely. The operator declares topology through CRDs, and the transport simply uses the existing resources:
# Kubernetes CRD - managed by the Messaging Topology OperatorapiVersion: rabbitmq.com/v1beta1kind: Queuemetadata: name: billing-ordersspec: name: billing-orders durable: true rabbitmqClusterReference: name: my-cluster
// Application code - topology already exists on the brokertransport.AutoProvision(false);transport.DeclareExchange("order-events");transport.DeclareQueue("billing-orders");transport.DeclareBinding("order-events", "billing-orders");
Opt-out individual resources: Keep auto-provisioning enabled but skip specific resources that are managed elsewhere.
transport.DeclareExchange("platform-events") .AutoProvision(false); // managed by platform teamtransport.DeclareQueue("my-queue"); // auto-provisioned (default)transport.DeclareBinding("platform-events", "my-queue"); // auto-provisioned (default)
Prefetch and concurrency
Use transport.Handler<T>() to claim a handler and configure prefetch and concurrency on its convention-named endpoint:
builder.Services .AddMessageBus() .AddEventHandler<OrderPlacedEventHandler>() .AddRabbitMQ(transport => { transport.Handler<OrderPlacedEventHandler>() .ConfigureEndpoint(e => e.MaxPrefetch(50).MaxConcurrency(10)); });
This keeps the convention-derived endpoint name while tuning the consumer settings. ConfigureEndpoint() can be called multiple times - actions compose in declaration order:
transport.Handler<OrderPlacedEventHandler>() .ConfigureEndpoint(e => e.MaxPrefetch(50)) .ConfigureEndpoint(e => e.MaxConcurrency(10)) .ConfigureEndpoint(e => e.FaultEndpoint("order-errors"));
For full control over the endpoint name and queue, use explicit binding with Endpoint("name"):
builder.Services .AddMessageBus() .AddEventHandler<OrderPlacedEventHandler>() .AddRabbitMQ(transport => { transport.BindHandlersExplicitly();
transport.Endpoint("order-processing") .Queue("orders.processing") .MaxPrefetch(50) .MaxConcurrency(10) .Handler<OrderPlacedEventHandler>(); });
MaxPrefetch controls how many unacknowledged messages RabbitMQ delivers to the consumer at once. Default: 100. Lower values reduce memory pressure under high load. Higher values improve throughput for fast handlers.
MaxConcurrency controls how many messages the endpoint processes in parallel. Set this based on your handler's throughput characteristics.
A good starting point: set MaxPrefetch equal to or slightly higher than MaxConcurrency. For slow handlers (long database operations, external API calls), lower MaxPrefetch to 10–20 to prevent messages from piling up in the consumer's unacknowledged buffer. For quorum queues specifically, avoid setting MaxPrefetch to 1 - a prefetch of 1 starves consumers while acknowledgements flow through the consensus mechanism and significantly reduces throughput.
For prefetch tuning guidance from first principles, see CloudAMQP Best Practices.
Auto-provisioned resource naming
| Resource | Naming convention | Created when |
|---|---|---|
| Exchange (event) | Message type name (e.g., OrderPlacedEvent) | First publish or subscribe |
| Exchange (command) | Message type name (e.g., ReserveInventoryCommand) | First send or handler registration |
| Queue | Endpoint name derived from handler registration | Handler is bound to the transport |
| Reply queue | Instance-specific name | Transport starts |
| Bindings | Exchange-to-queue | Endpoint discovery phase |
All auto-provisioned resources are durable by default and survive broker restarts.
Routing keys
RabbitMQ uses a routing_key field on every published message to decide which queues receive it. When you publish to a topic exchange, the broker compares the message's routing key against binding patterns on each queue. Queues whose pattern matches get the message. Queues that don't match never see it.
Direct exchanges work the same way, but require an exact match instead of a pattern.
Fanout exchanges ignore routing keys entirely - every bound queue gets every message.
Routing keys are useful when you need to split a single message stream across different consumers based on a property of the message itself:
- Disconnecting producers from consumers - publish messages without knowing which queues or services will consume them. Consumers can bind with patterns to receive only the messages they care about.
- Multi-tenant routing - route messages to tenant-specific queues (
tenant-a.orders,tenant-b.orders) - Region-based routing - route to regional processors (
us.east,eu.west) - Priority routing - separate high-priority and low-priority messages (
priority.high,priority.low)
For a full treatment of topic exchange routing, see the RabbitMQ Topics Tutorial.
Configure routing key extraction
To set a routing key on published messages, call UseRabbitMQRoutingKey<T>() when registering the message type:
builder.Services .AddMessageBus() .AddMessage<OrderEvent>(m => m .UseRabbitMQRoutingKey<OrderEvent>(msg => msg.Region)) .AddRabbitMQ();
The extractor function runs at dispatch time for each message. It receives the message instance and returns the routing key string. Return null to publish without a routing key.
UseRabbitMQRoutingKey<T>() is configured on AddMessage<T>(), not on the transport or endpoint. This keeps routing key logic next to the message definition where it belongs.
Composite routing keys
Combine multiple properties into a single routing key using string interpolation:
builder.Services .AddMessageBus() .AddMessage<OrderEvent>(m => m .UseRabbitMQRoutingKey<OrderEvent>(msg => $"{msg.TenantId}.{msg.Region}")) .AddRabbitMQ();
This produces routing keys like acme.us.east or contoso.eu.west, which you can match with topic exchange binding patterns like acme.# or *.eu.*.
Topic exchange example
This example routes region-tagged events to different queues based on their routing key. The US queue receives messages matching us.*, and the EU queue receives messages matching eu.*.
graph LR
P[Publisher] -->|"region = us.east"| E[Topic Exchange<br/>region-events]
E -->|"us.* ✓"| QA[Queue<br/>us-orders]
E -->|"eu.* ✗"| QB[Queue<br/>eu-orders]
QA --> CA[US Consumer]
style QB stroke-dasharray: 5 5Define the message type
public sealed class RegionEvent{ public required string Region { get; init; } public required string Payload { get; init; }}
Wire up the bus
builder.Services .AddMessageBus() .AddConsumer<UsRegionConsumer>() .AddConsumer<EuRegionConsumer>() .AddMessage<RegionEvent>(m => m .UseRabbitMQRoutingKey<RegionEvent>(msg => msg.Region)) .AddRabbitMQ(transport => { transport.BindHandlersExplicitly();
// Declare topology: one topic exchange, two queues, two bindings with patterns transport.DeclareExchange("region-events") .Type(RabbitMQExchangeType.Topic);
transport.DeclareQueue("us-orders"); transport.DeclareQueue("eu-orders");
transport.DeclareBinding("region-events", "us-orders") .RoutingKey("us.*"); transport.DeclareBinding("region-events", "eu-orders") .RoutingKey("eu.*");
// Bind consumers to queues transport.Endpoint("us-ep") .Consumer<UsRegionConsumer>() .Queue("us-orders"); transport.Endpoint("eu-ep") .Consumer<EuRegionConsumer>() .Queue("eu-orders");
// Dispatch to the topic exchange transport.DispatchEndpoint("region-dispatch") .ToExchange("region-events") .Publish<RegionEvent>(); });
When you publish a RegionEvent with Region = "us.east", the routing key middleware extracts "us.east" from the message and sets it on the AMQP publish. The topic exchange matches "us.east" against us.* (match) and eu.* (no match). Only the US queue receives the message.
Topic exchange binding patterns
| Pattern | Matches | Does not match |
|---|---|---|
us.* | us.east, us.west | us.east.az1, eu.west |
eu.# | eu.west, eu.west.az1 | us.east |
# | Everything | - |
*.*.az1 | us.east.az1, eu.west.az1 | us.east |
* matches exactly one word. # matches zero or more words. Words are separated by dots.
Direct exchange routing keys
Direct exchanges use exact-match routing keys instead of patterns. A message with routing key "priority-high" reaches only queues bound with exactly "priority-high".
builder.Services .AddMessageBus() .AddConsumer<HighPriorityConsumer>() .AddMessage<TaskEvent>(m => m .UseRabbitMQRoutingKey<TaskEvent>(msg => $"priority-{msg.Priority}")) .AddRabbitMQ(transport => { transport.BindHandlersExplicitly();
transport.DeclareExchange("task-routing") .Type(RabbitMQExchangeType.Direct);
transport.DeclareQueue("high-priority-tasks"); transport.DeclareBinding("task-routing", "high-priority-tasks") .RoutingKey("priority-high");
transport.Endpoint("high-priority-ep") .Consumer<HighPriorityConsumer>() .Queue("high-priority-tasks");
transport.DispatchEndpoint("task-dispatch") .ToExchange("task-routing") .Publish<TaskEvent>(); });
Messages with Priority = "high" reach the queue. Messages with any other priority are dropped by the exchange (unless another queue is bound with a matching routing key).
Next steps
- Transports Overview - Understand the transport abstraction and lifecycle.
- Handlers and Consumers - Learn about handler types and consumer configuration.
- Reliability - Configure dead-letter routing, outbox, inbox, and fault handling.
Runnable example: RabbitMQ
Full demo: All three Demo services use RabbitMQ in production mode with .NET Aspire. See Demo.AppHost for the Aspire orchestration and Demo.Catalog for a complete service using
.AddRabbitMQ()with outbox, inbox, sagas, and multiple handler types.