RabbitMQ Transport


Note! Jasper uses the Rabbit MQ .Net Client to connect to Rabbit MQ.

Quick Start

If you're starting a fresh project, you can quickly spin up a new Jasper project using Rabbit MQ with a dotnet new template.

First install the JasperTemplates nuget like so:

dotnet new --install JasperTemplates

Then build out the directory for your intended project, and use:

dotnet new jasper.rabbitmq

Then check the README.md file in the generated directory for an overview of what was generated for you.

Installing

All the code samples in this section are from the Ping/Pong with Rabbit MQ sample project.

To use RabbitMQ as a transport with Jasper, first install the Jasper.RabbitMQ library via nuget to your project. Behind the scenes, this package uses the RabbitMQ C# Client to both send and receive messages from RabbitMQ.


internal class JasperConfig : JasperOptions
{
    public JasperConfig()
    {
        Endpoints
            .ListenToRabbitQueue("pongs")

            // With the Rabbit MQ transport, you probably
            // want to explicitly designate a specific queue or topic
            // for replies
            .UseForReplies();

        Endpoints.PublishAllMessages().ToRabbit("pings");

        // Configure Rabbit MQ connections and optionally declare Rabbit MQ
        // objects through an extension method on JasperOptions.Endpoints
        Endpoints.ConfigureRabbitMq(rabbit =>
        {
            // Using a local installation of Rabbit MQ
            // via a running Docker image
            rabbit.ConnectionFactory.HostName = "localhost";

            // This directs Jasper to try to create any
            // missing Rabbit MQ objects that are declared
            // in this JasperOptions class
            rabbit.AutoProvision = true;

            rabbit.DeclareQueue("pongs");
            rabbit.DeclareQueue("pings");
        });

        // You an register additional IoC services
        // directly in the JasperOptions with either
        // Lamar specific registrations or in this case,
        // the built in DI abstractions in .Net Core

        // Because Jasper rides on top of the built in
        // .Net Core generic host, you can use the
        // IHostedService
        Services.AddHostedService<PingerService>();
    }

}

See the Rabbit MQ .Net Client documentation for more information about configuring the ConnectionFactory to connect to Rabbit MQ.

All the calls to Declare*****() are optional helpers for auto-provisioning Rabbit MQ objects on application startup. This is probably only useful for development or testing, but it's there.

Subscribe and Publish Messages to a named Queue or Routing Key

In terms of publishing or listening to a specific, named queue (or publish to a routing key), use the syntax shown below:


internal class JasperConfig : JasperOptions
{
    public JasperConfig()
    {
        Endpoints
            // Listen for messages incoming on a specific
            // named queue
            .ListenToRabbitQueue("pongs")

            // With the Rabbit MQ transport, you probably
            // want to explicitly designate a specific queue or topic
            // for replies
            .UseForReplies();

        Endpoints.PublishAllMessages()

            // The argument here can be either a queue
            // name or a routing key. It's the same as far
            // as the Rabbit MQ .Net Client is concerned
            .ToRabbit("pings");
    }

    public override void Configure(IHostEnvironment hosting, IConfiguration config)
    {
        Endpoints.ConfigureRabbitMq(rabbit =>
        {
            rabbit.ConnectionFactory.Uri = config.GetValue<Uri>("rabbit");
        });
    }
}

Or if you want to do this by Uri:


internal class JasperConfig2 : JasperOptions
{
    public JasperConfig2()
    {
        Endpoints
            // Listen for messages incoming on a specific
            // named queue
            .ListenToRabbitQueue("rabbitmq://queue/pongs")



            // With the Rabbit MQ transport, you probably
            // want to explicitly designate a specific queue or topic
            // for replies
            .UseForReplies();


        Endpoints.PublishAllMessages()
            // To a specific queue
            .To("rabbitmq://queue/pings");
    }

    public override void Configure(IHostEnvironment hosting, IConfiguration config)
    {
        Endpoints.ConfigureRabbitMq(rabbit =>
        {
            rabbit.ConnectionFactory.Uri = config.GetValue<Uri>("rabbit");
        });
    }
}

Please note that you will lose the option to configure Rabbit MQ-specific options by endpoint if you use the generic Uri approach.

Publish Messages to a specific Topic

Publishing to a specific topic can be done with this syntax:


internal class JasperConfig3 : JasperOptions
{
    public JasperConfig3()
    {
        Endpoints
            // Listen for messages incoming on a specific
            // named queue
            .ListenToRabbitQueue("pongs")

            // With the Rabbit MQ transport, you probably
            // want to explicitly designate a specific queue or topic
            // for replies
            .UseForReplies();

        Endpoints.PublishAllMessages()

            // The argument here can be either a queue
            // name or a routing key. It's the same as far
            // as the Rabbit MQ .Net Client is concerned
            .ToRabbit("pings", "topics");
    }

    public override void Configure(IHostEnvironment hosting, IConfiguration config)
    {
        Endpoints.ConfigureRabbitMq(rabbit =>
        {
            rabbit.ConnectionFactory.Uri = config.GetValue<Uri>("rabbit");

            if (hosting.IsDevelopment())
            {
                rabbit.AutoProvision = true;
            }

            // This is optional, but does help for local development
            rabbit.DeclareExchange("topics", exchange =>
            {
                exchange.ExchangeType = ExchangeType.Topic;
            });

            rabbit.DeclareQueue("incoming-pings", q =>
            {
                // Just showing that it's possible to further configure
                // the queue
                q.IsDurable = true;
            });

            // This would more likely be on the listener side,
            // but just showing you what can be done
            rabbit.DeclareBinding(new Binding
            {
                BindingKey = "pings",
                ExchangeName = "topics",
                QueueName = "incoming-pings"
            });
        });
    }
}

Please note that in the call to Endpoints.Publish****().ToRabbitMq(), the second argument refers to the Rabbit MQ exchange name and this must be specified to publish to a named topic.

Fanout Exchanges

Jasper.RabbitMQ supports using Rabbit MQ fanout exchanges as shown below:


internal class JasperConfig4 : JasperOptions
{
    public JasperConfig4()
    {
        Endpoints
            // Listen for messages incoming on a specific
            // named queue
            .ListenToRabbitQueue("pongs")

            // With the Rabbit MQ transport, you probably
            // want to explicitly designate a specific queue or topic
            // for replies
            .UseForReplies();

        // Publish to the exchange name
        Endpoints.PublishAllMessages()
            .ToRabbitExchange("fan");
    }

    public override void Configure(IHostEnvironment hosting, IConfiguration config)
    {
        Endpoints.ConfigureRabbitMq(rabbit =>
        {
            rabbit.ConnectionFactory.Uri = config.GetValue<Uri>("rabbit");

            if (hosting.IsDevelopment())
            {
                rabbit.AutoProvision = true;
            }

            // This is optional
            rabbit.DeclareExchange("fan", exchange =>
            {
                exchange.ExchangeType = ExchangeType.Fanout;
            });

            // This would more likely be on the listener side,
            // but just showing you what can be done
            rabbit.DeclareBinding(new Binding
            {
                BindingKey = "pings",
                ExchangeName = "fan",
                QueueName = "incoming-pings"
            });
        });
    }
}

Scheduled Messages

Jasper does not at this time support Rabbit MQ's plugin for delayed delivery. When using Rabbit MQ, the scheduled delivery function is done by polling the configured message store by Jasper rather than depending on Rabbit MQ itself.

See Scheduled Message Delivery and Execution for more information.

Connecting to non-Jasper Applications

Lastly, you may want to use the Rabbit MQ transport to integrate with other applications that aren't using Jasper. To make that work, you may need to do some mapping between Jasper's Envelope structure and Rabbit MQ's structures using a custom implementation of Jasper.RabbitMq.IRabbitMqProtocol.

That interface is shown below:


public interface IRabbitMqProtocol
{
    void WriteFromEnvelope(Envelope envelope, IBasicProperties properties);

    Envelope ReadEnvelope(byte[] body, IBasicProperties properties);
}

And here's what the default protocol looks like because it's likely easier to start with this than build something all new:


public class DefaultRabbitMqProtocol : IRabbitMqProtocol
{
    public virtual Envelope ReadEnvelope(byte[] data, IBasicProperties props)
    {
        var envelope = new Envelope
        {
            Data = data,
            Source = props.AppId,
            ContentType = props.ContentType,
            MessageType = props.Type,
            ReplyUri = props.ReplyTo.IsEmpty() ? null : new Uri(props.ReplyTo)
        };

        if (Guid.TryParse(props.CorrelationId, out var id))
        {
            envelope.Id = id;
        }

        if (props.Headers != null) envelope.ReadPropertiesFromDictionary(props.Headers);

        return envelope;
    }

    public virtual void WriteFromEnvelope(Envelope envelope, IBasicProperties properties)
    {
        properties.CorrelationId = envelope.Id.ToString();
        properties.AppId = envelope.Source;
        properties.ContentType = envelope.ContentType;
        properties.Type = envelope.MessageType;
        if (envelope.ReplyUri != null) properties.ReplyTo = envelope.ReplyUri.ToString();

        if (properties.Headers == null) properties.Headers = new Dictionary<string, object>();

        envelope.WriteToDictionary(properties.Headers);
    }
}

Lastly, to apply the protocol, use the mechanism shown in the previous section.

Auto-Provisioning Rabbit MQ Objects at Application Startup

One of the best things about developing against Rabbit MQ is how easy it is to set up your environment for local development. Run Rabbit MQ in a Docker container and spin up queues, exchanges, and bindings on the fly. Jasper.RabbitMQ uses a combination of the Declare***() methods and the AutoProvision property to direct Jasper to create any missing Rabbit MQ objects at application start up time.

Here's a sample usage:


internal class JasperConfig3 : JasperOptions
{
    public JasperConfig3()
    {
        Endpoints
            // Listen for messages incoming on a specific
            // named queue
            .ListenToRabbitQueue("pongs")

            // With the Rabbit MQ transport, you probably
            // want to explicitly designate a specific queue or topic
            // for replies
            .UseForReplies();

        Endpoints.PublishAllMessages()

            // The argument here can be either a queue
            // name or a routing key. It's the same as far
            // as the Rabbit MQ .Net Client is concerned
            .ToRabbit("pings", "topics");
    }

    public override void Configure(IHostEnvironment hosting, IConfiguration config)
    {
        Endpoints.ConfigureRabbitMq(rabbit =>
        {
            rabbit.ConnectionFactory.Uri = config.GetValue<Uri>("rabbit");

            if (hosting.IsDevelopment())
            {
                rabbit.AutoProvision = true;
            }

            // This is optional, but does help for local development
            rabbit.DeclareExchange("topics", exchange =>
            {
                exchange.ExchangeType = ExchangeType.Topic;
            });

            rabbit.DeclareQueue("incoming-pings", q =>
            {
                // Just showing that it's possible to further configure
                // the queue
                q.IsDurable = true;
            });

            // This would more likely be on the listener side,
            // but just showing you what can be done
            rabbit.DeclareBinding(new Binding
            {
                BindingKey = "pings",
                ExchangeName = "topics",
                QueueName = "incoming-pings"
            });
        });
    }
}