RabbitMQ Transport

Configure the RabbitMQ transport in Mocha for production messaging with automatic topology provisioning, connection management, and prefetch tuning.

RabbitMQ transport

The RabbitMQ transport connects Mocha to a RabbitMQ broker for production messaging. It manages connections, provisions exchanges and queues automatically, handles message acknowledgement, and supports request/reply with dedicated reply endpoints. When you need durable, distributed messaging across multiple services, this is the transport to use.

Set up the RabbitMQ transport

By the end of this section, you will have a Mocha bus connected to RabbitMQ with automatic topology provisioning.

Install the package

Bash
dotnet add package Mocha.Transport.RabbitMQ

Register with .NET Aspire

The most common setup uses the Aspire RabbitMQ component for connection management:

Bash
dotnet add package Aspire.RabbitMQ.Client
C#
using Mocha;
using Mocha.Transport.RabbitMQ;
var builder = WebApplication.CreateBuilder(args);
// Aspire registers IConnectionFactory from the "rabbitmq" connection resource
builder.AddRabbitMQClient("rabbitmq");
// Register the message bus with RabbitMQ transport
builder.Services
.AddMessageBus()
.AddEventHandler<OrderPlacedEventHandler>()
.AddRabbitMQ();
var app = builder.Build();
app.Run();

The Aspire component reads the connection string from configuration (typically ConnectionStrings:rabbitmq), handles health checks, and integrates with the Aspire dashboard for observability.

.AddRabbitMQ() picks up the IConnectionFactory from DI (registered by Aspire) and uses it to establish connections to the broker. Default conventions automatically create exchanges, queues, and bindings for your registered handlers.

Register with a manual connection string

If you are not using Aspire, register the IConnectionFactory directly:

C#
using Mocha;
using Mocha.Transport.RabbitMQ;
using RabbitMQ.Client;
var builder = WebApplication.CreateBuilder(args);
// Register IConnectionFactory manually
builder.Services.AddSingleton<IConnectionFactory>(_ =>
new ConnectionFactory
{
HostName = "localhost",
Port = 5672,
VirtualHost = "/",
UserName = "guest",
Password = "guest"
});
builder.Services
.AddMessageBus()
.AddEventHandler<OrderPlacedEventHandler>()
.AddRabbitMQ();
var app = builder.Build();
app.Run();

To use a connection string from configuration:

C#
builder.Services.AddSingleton<IConnectionFactory>(_ =>
{
var factory = new ConnectionFactory();
factory.Uri = new Uri(builder.Configuration.GetConnectionString("rabbitmq")!);
return factory;
});

Use a custom connection provider

For full control over connection lifecycle, provide a custom IRabbitMQConnectionProvider:

C#
builder.Services
.AddMessageBus()
.AddRabbitMQ(transport =>
{
transport.ConnectionProvider(sp =>
{
return sp.GetRequiredService<MyCustomConnectionProvider>();
});
});

The IRabbitMQConnectionProvider interface exposes Host, Port, VirtualHost, and a CreateAsync method. When no custom provider is registered, the transport falls back to resolving IConnectionFactory from DI and wrapping it in a default provider.

Verify it works

Add an endpoint that publishes through the bus and verify the handler executes:

C#
app.MapPost("/orders", async (IMessageBus bus) =>
{
await bus.PublishAsync(new OrderPlacedEvent
{
OrderId = Guid.NewGuid(),
CustomerId = "customer-1",
TotalAmount = 99.99m
}, CancellationToken.None);
return Results.Ok();
});

Send a POST request to /orders and check your application logs. You should see the handler process the event. You can also inspect the RabbitMQ management UI at http://localhost:15672 to see the auto-provisioned exchanges and queues.

Two connections per broker transport

Mocha opens two connections to the broker: one for consuming and one for dispatching.

This design prevents back-pressure from slow consumers from blocking outbound message publishing. When a consumer processes messages slowly, the RabbitMQ client applies back-pressure on that connection. Without separation, a slow consumer could prevent your application from publishing new messages entirely. With separate connections, each direction operates independently.

How topology works

When the transport starts, it provisions topology on the broker automatically. Here is how message types map to RabbitMQ resources:

graph LR
    P[Publisher] -->|publish| E[Exchange<br/>order-placed-event]
    E -->|binding| Q[Queue<br/>billing-service]
    Q -->|consume| C[Consumer]

Events (publish/subscribe): Each event type gets a fanout exchange. Each service that subscribes creates a queue bound to that exchange. Publishing sends the message to the exchange, which fans it out to all bound queues.

Commands (send): Each command type gets a direct exchange bound to a single queue. Sending delivers the message to exactly one consumer.

Request/reply: The transport creates a temporary reply queue per service instance. The reply address is embedded in the request message so the responder knows where to send the reply.

:::warning Message loss warning. Messages published before the transport completes its Start phase may be lost if no queue is bound to the exchange yet. During deployment, ensure consuming services start before publishing services, or use publisher confirms to detect lost messages.

If a message is published to an exchange with no bound queue - for example, when no consumer has started - that message is dropped. Mocha auto-provisions topology, but the window between exchange creation and queue binding is a real operational risk. :::

Publisher confirms

Mocha's RabbitMQ transport uses publisher confirms on dispatch, which means the broker acknowledges each published message before the publish call completes. This provides at-least-once delivery guarantees for outbound messages: if the broker does not confirm, the publish fails with an exception. See the RabbitMQ Reliability Guide for a full treatment of delivery guarantees.

Default topology for event handlers

When you register an event handler with AddEventHandler<T>(), the RabbitMQ transport creates this topology:

A fanout exchange named after the message type fans out to per-service exchanges, which bind to per-service queues. This allows multiple services to each receive a copy of every published event.

Default topology for send handlers

When you register a request handler with AddRequestHandler<T>() for send (fire-and-forget), the transport creates a single queue:

Send messages go to a dedicated queue. Only one handler processes each message - this is the point-to-point guarantee.

Configure transport-level defaults

You can set defaults that apply to all auto-provisioned queues and exchanges. This is useful when you want consistent settings across all resources without configuring each one individually.

Use ConfigureDefaults to set queue and exchange defaults:

C#
builder.Services
.AddMessageBus()
.AddRabbitMQ(transport =>
{
transport.ConfigureDefaults(defaults =>
{
// All queues will be quorum with a delivery limit of 5
defaults.Queue.QueueType = RabbitMQQueueType.Quorum;
defaults.Queue.Arguments["x-delivery-limit"] = 5;
// All exchanges will use topic routing
defaults.Exchange.Type = RabbitMQExchangeType.Topic;
});
});

For example, to enable quorum queues with a specific initial group size:

C#
builder.Services
.AddMessageBus()
.AddRabbitMQ(transport =>
{
transport.ConfigureDefaults(defaults =>
{
defaults.Queue.QueueType = RabbitMQQueueType.Quorum;
defaults.Queue.Arguments["x-quorum-initial-group-size"] = 3;
});
});

Or to use stream queues for append-only log semantics:

C#
builder.Services
.AddMessageBus()
.AddRabbitMQ(transport =>
{
transport.ConfigureDefaults(defaults =>
{
defaults.Queue.QueueType = RabbitMQQueueType.Stream;
});
});

Available queue defaults:

PropertyTypeDescription
QueueTypestringQueue type: RabbitMQQueueType.Classic, .Quorum, or .Stream
Durablebool?Whether queues survive broker restarts (default: true)
AutoDeletebool?Whether queues are auto-deleted when unused (default: false)
ArgumentsDictionary<string, object>Additional arguments (e.g., x-delivery-limit, x-max-priority)

Available exchange defaults:

PropertyTypeDescription
TypestringExchange type: RabbitMQExchangeType.Fanout, .Direct, .Topic, or .Headers
Durablebool?Whether exchanges survive broker restarts (default: true)
AutoDeletebool?Whether exchanges are auto-deleted when unused (default: false)
ArgumentsDictionary<string, object>Additional arguments (e.g., alternate-exchange)

Defaults never override explicitly configured values. If you declare a queue with a specific queue type, that setting takes precedence over the transport default. You can call ConfigureDefaults multiple times - each call accumulates settings on the same defaults object.

Declare custom topology

Mocha auto-provisions topology by default. To declare additional exchanges, queues, or bindings:

C#
builder.Services
.AddMessageBus()
.AddRabbitMQ(transport =>
{
// Declare an exchange
transport.DeclareExchange("order-events")
.Type(RabbitMQExchangeType.Fanout)
.Durable()
.AutoProvision();
// Declare a queue (use quorum type for production)
transport.DeclareQueue("billing-orders")
.Durable()
.AutoProvision()
.WithArgument("x-queue-type", "quorum");
// Bind the exchange to the queue
transport.DeclareBinding("order-events", "billing-orders")
.AutoProvision();
});

All explicitly declared topology is provisioned when the transport starts, before receive endpoints begin consuming.

Control auto-provisioning

By default, the transport auto-provisions all topology resources (exchanges, queues, bindings) on the broker at startup. In production environments where infrastructure is managed externally - for example by Terraform, Ansible, or the RabbitMQ Messaging Topology Operator on Kubernetes - you can disable auto-provisioning so the transport expects resources to already exist.

Disable globally

Turn off auto-provisioning for the entire transport:

C#
builder.Services
.AddMessageBus()
.AddEventHandler<OrderPlacedEventHandler>()
.AddRabbitMQ(transport =>
{
transport.AutoProvision(false);
});

With auto-provisioning disabled, the transport will not create any exchanges, queues, or bindings. All resources must already exist on the broker before the transport starts.

Override per resource

Individual resources can override the transport-level setting. This is useful when most topology is managed externally but a few resources need to be created dynamically:

C#
builder.Services
.AddMessageBus()
.AddRabbitMQ(transport =>
{
// Disable globally
transport.AutoProvision(false);
// This exchange already exists on the broker - skip provisioning
transport.DeclareExchange("order-events");
// This queue should be created by the transport
transport.DeclareQueue("billing-orders")
.AutoProvision(true);
// This binding should also be created
transport.DeclareBinding("order-events", "billing-orders")
.AutoProvision(true);
});

The effective auto-provision value for each resource follows a cascading pattern:

Resource settingTransport settingResult
trueanyProvisioned
falseanyNot provisioned
not settrue (default)Provisioned
not setfalseNot provisioned

When a resource does not specify AutoProvision, it inherits the transport-level default. When the transport does not specify AutoProvision, it defaults to true.

Common patterns

Fully managed infrastructure: Disable auto-provisioning globally and declare all resources without AutoProvision. The transport will use existing broker resources without attempting to create them.

C#
transport.AutoProvision(false);
transport.DeclareExchange("order-events");
transport.DeclareQueue("billing-orders");
transport.DeclareBinding("order-events", "billing-orders");

Selective provisioning: Disable globally but enable for specific resources that are owned by this service.

C#
transport.AutoProvision(false);
transport.DeclareExchange("shared-events"); // managed externally
transport.DeclareQueue("my-service-queue")
.AutoProvision(true); // owned by this service
transport.DeclareBinding("shared-events", "my-service-queue")
.AutoProvision(true); // owned by this service

Kubernetes with the Messaging Topology Operator: When the RabbitMQ Messaging Topology Operator manages your exchanges, queues, and bindings as Kubernetes custom resources, disable auto-provisioning entirely. The operator declares topology through CRDs, and the transport simply uses the existing resources:

# Kubernetes CRD - managed by the Messaging Topology Operator
apiVersion: rabbitmq.com/v1beta1
kind: Queue
metadata:
name: billing-orders
spec:
name: billing-orders
durable: true
rabbitmqClusterReference:
name: my-cluster
C#
// Application code - topology already exists on the broker
transport.AutoProvision(false);
transport.DeclareExchange("order-events");
transport.DeclareQueue("billing-orders");
transport.DeclareBinding("order-events", "billing-orders");

Opt-out individual resources: Keep auto-provisioning enabled but skip specific resources that are managed elsewhere.

C#
transport.DeclareExchange("platform-events")
.AutoProvision(false); // managed by platform team
transport.DeclareQueue("my-queue"); // auto-provisioned (default)
transport.DeclareBinding("platform-events", "my-queue"); // auto-provisioned (default)

Prefetch and concurrency

Use transport.Handler<T>() to claim a handler and configure prefetch and concurrency on its convention-named endpoint:

C#
builder.Services
.AddMessageBus()
.AddEventHandler<OrderPlacedEventHandler>()
.AddRabbitMQ(transport =>
{
transport.Handler<OrderPlacedEventHandler>()
.ConfigureEndpoint(e => e.MaxPrefetch(50).MaxConcurrency(10));
});

This keeps the convention-derived endpoint name while tuning the consumer settings. ConfigureEndpoint() can be called multiple times - actions compose in declaration order:

C#
transport.Handler<OrderPlacedEventHandler>()
.ConfigureEndpoint(e => e.MaxPrefetch(50))
.ConfigureEndpoint(e => e.MaxConcurrency(10))
.ConfigureEndpoint(e => e.FaultEndpoint("order-errors"));

For full control over the endpoint name and queue, use explicit binding with Endpoint("name"):

C#
builder.Services
.AddMessageBus()
.AddEventHandler<OrderPlacedEventHandler>()
.AddRabbitMQ(transport =>
{
transport.BindHandlersExplicitly();
transport.Endpoint("order-processing")
.Queue("orders.processing")
.MaxPrefetch(50)
.MaxConcurrency(10)
.Handler<OrderPlacedEventHandler>();
});

MaxPrefetch controls how many unacknowledged messages RabbitMQ delivers to the consumer at once. Default: 100. Lower values reduce memory pressure under high load. Higher values improve throughput for fast handlers.

MaxConcurrency controls how many messages the endpoint processes in parallel. Set this based on your handler's throughput characteristics.

A good starting point: set MaxPrefetch equal to or slightly higher than MaxConcurrency. For slow handlers (long database operations, external API calls), lower MaxPrefetch to 10–20 to prevent messages from piling up in the consumer's unacknowledged buffer. For quorum queues specifically, avoid setting MaxPrefetch to 1 - a prefetch of 1 starves consumers while acknowledgements flow through the consensus mechanism and significantly reduces throughput.

For prefetch tuning guidance from first principles, see CloudAMQP Best Practices.

Auto-provisioned resource naming

ResourceNaming conventionCreated when
Exchange (event)Message type name (e.g., OrderPlacedEvent)First publish or subscribe
Exchange (command)Message type name (e.g., ReserveInventoryCommand)First send or handler registration
QueueEndpoint name derived from handler registrationHandler is bound to the transport
Reply queueInstance-specific nameTransport starts
BindingsExchange-to-queueEndpoint discovery phase

All auto-provisioned resources are durable by default and survive broker restarts.

Routing keys

RabbitMQ uses a routing_key field on every published message to decide which queues receive it. When you publish to a topic exchange, the broker compares the message's routing key against binding patterns on each queue. Queues whose pattern matches get the message. Queues that don't match never see it.

Direct exchanges work the same way, but require an exact match instead of a pattern.

Fanout exchanges ignore routing keys entirely - every bound queue gets every message.

Routing keys are useful when you need to split a single message stream across different consumers based on a property of the message itself:

  • Disconnecting producers from consumers - publish messages without knowing which queues or services will consume them. Consumers can bind with patterns to receive only the messages they care about.
  • Multi-tenant routing - route messages to tenant-specific queues (tenant-a.orders, tenant-b.orders)
  • Region-based routing - route to regional processors (us.east, eu.west)
  • Priority routing - separate high-priority and low-priority messages (priority.high, priority.low)

For a full treatment of topic exchange routing, see the RabbitMQ Topics Tutorial.

Configure routing key extraction

To set a routing key on published messages, call UseRabbitMQRoutingKey<T>() when registering the message type:

C#
builder.Services
.AddMessageBus()
.AddMessage<OrderEvent>(m => m
.UseRabbitMQRoutingKey<OrderEvent>(msg => msg.Region))
.AddRabbitMQ();

The extractor function runs at dispatch time for each message. It receives the message instance and returns the routing key string. Return null to publish without a routing key.

UseRabbitMQRoutingKey<T>() is configured on AddMessage<T>(), not on the transport or endpoint. This keeps routing key logic next to the message definition where it belongs.

Composite routing keys

Combine multiple properties into a single routing key using string interpolation:

C#
builder.Services
.AddMessageBus()
.AddMessage<OrderEvent>(m => m
.UseRabbitMQRoutingKey<OrderEvent>(msg => $"{msg.TenantId}.{msg.Region}"))
.AddRabbitMQ();

This produces routing keys like acme.us.east or contoso.eu.west, which you can match with topic exchange binding patterns like acme.# or *.eu.*.

Topic exchange example

This example routes region-tagged events to different queues based on their routing key. The US queue receives messages matching us.*, and the EU queue receives messages matching eu.*.

graph LR
    P[Publisher] -->|"region = us.east"| E[Topic Exchange<br/>region-events]
    E -->|"us.* ✓"| QA[Queue<br/>us-orders]
    E -->|"eu.* ✗"| QB[Queue<br/>eu-orders]
    QA --> CA[US Consumer]

    style QB stroke-dasharray: 5 5

Define the message type

C#
public sealed class RegionEvent
{
public required string Region { get; init; }
public required string Payload { get; init; }
}

Wire up the bus

C#
builder.Services
.AddMessageBus()
.AddConsumer<UsRegionConsumer>()
.AddConsumer<EuRegionConsumer>()
.AddMessage<RegionEvent>(m => m
.UseRabbitMQRoutingKey<RegionEvent>(msg => msg.Region))
.AddRabbitMQ(transport =>
{
transport.BindHandlersExplicitly();
// Declare topology: one topic exchange, two queues, two bindings with patterns
transport.DeclareExchange("region-events")
.Type(RabbitMQExchangeType.Topic);
transport.DeclareQueue("us-orders");
transport.DeclareQueue("eu-orders");
transport.DeclareBinding("region-events", "us-orders")
.RoutingKey("us.*");
transport.DeclareBinding("region-events", "eu-orders")
.RoutingKey("eu.*");
// Bind consumers to queues
transport.Endpoint("us-ep")
.Consumer<UsRegionConsumer>()
.Queue("us-orders");
transport.Endpoint("eu-ep")
.Consumer<EuRegionConsumer>()
.Queue("eu-orders");
// Dispatch to the topic exchange
transport.DispatchEndpoint("region-dispatch")
.ToExchange("region-events")
.Publish<RegionEvent>();
});

When you publish a RegionEvent with Region = "us.east", the routing key middleware extracts "us.east" from the message and sets it on the AMQP publish. The topic exchange matches "us.east" against us.* (match) and eu.* (no match). Only the US queue receives the message.

Topic exchange binding patterns

PatternMatchesDoes not match
us.*us.east, us.westus.east.az1, eu.west
eu.#eu.west, eu.west.az1us.east
#Everything-
*.*.az1us.east.az1, eu.west.az1us.east

* matches exactly one word. # matches zero or more words. Words are separated by dots.

Direct exchange routing keys

Direct exchanges use exact-match routing keys instead of patterns. A message with routing key "priority-high" reaches only queues bound with exactly "priority-high".

C#
builder.Services
.AddMessageBus()
.AddConsumer<HighPriorityConsumer>()
.AddMessage<TaskEvent>(m => m
.UseRabbitMQRoutingKey<TaskEvent>(msg => $"priority-{msg.Priority}"))
.AddRabbitMQ(transport =>
{
transport.BindHandlersExplicitly();
transport.DeclareExchange("task-routing")
.Type(RabbitMQExchangeType.Direct);
transport.DeclareQueue("high-priority-tasks");
transport.DeclareBinding("task-routing", "high-priority-tasks")
.RoutingKey("priority-high");
transport.Endpoint("high-priority-ep")
.Consumer<HighPriorityConsumer>()
.Queue("high-priority-tasks");
transport.DispatchEndpoint("task-dispatch")
.ToExchange("task-routing")
.Publish<TaskEvent>();
});

Messages with Priority = "high" reach the queue. Messages with any other priority are dropped by the exchange (unless another queue is bound with a matching routing key).

Next steps

Runnable example: RabbitMQ

Full demo: All three Demo services use RabbitMQ in production mode with .NET Aspire. See Demo.AppHost for the Aspire orchestration and Demo.Catalog for a complete service using .AddRabbitMQ() with outbox, inbox, sagas, and multiple handler types.

Last updated on April 13, 2026 by Michael Staib