Jasper as Command Bus

TIP

The in memory queueing feature is automatically enabled for all known message types within all Jasper applications.

Jasper can be used as an in-memory, command bus where messages can be processed either immediately or through in memory queues within your application. The queueing is all based around the TPL Dataflow library objects from the TPL Dataflow library. As such, you have a fair amount of control over parallelization and even some back pressure. These local queues can be used directly, or as a transport to accept messages sent through IMessagePublisher.SendAsync() or IMessagePublisher.PublishAsync(). using the application's message routing rules.

Enqueueing Messages Locally

warning

The IMessagePublisher and IExecutionContext interfaces both implement the ICommandBus interface as well.

You can queue up messages to be executed locally and asynchronously in a background thread:

public static async Task enqueue_locally(ICommandBus bus)
{
    // Enqueue a message to the local worker queues
    await bus.EnqueueAsync(new Message1());

}

snippet source | anchor

This feature is useful for asynchronous processing in web applications or really any kind of application where you need some parallelization or concurrency.

Some things to know about the local queues:

  • Local worker queues can be durable, meaning that the enqueued messages are persisted first so that they aren't lost if the application is shut down before they're processed. More on that below.
  • You can use any number of named local queues, and they don't even have to be declared upfront (might want to be careful with that though)
  • Local worker queues utilize Jasper's error handling policies to selectively handle any detected exceptions from the message handlers.
  • You can control the priority and parallelization of each individual local queue
  • Message types can be routed to particular queues
  • Cascading messages can be used with the local queues
  • The local queues can be used like any other message transport and be the target of routing rules

The Default Queue

Out of the box, each Jasper application has a default queue named "default". In the absence of any other routing rules, all messages enqueued to ICommandBus will be published to this queue. The default in memory queue can be configured like this:

using var host = await Host.CreateDefaultBuilder()
    .UseJasper(opts =>
    {
        opts.DefaultLocalQueue.MaximumParallelMessages(3);
    }).StartAsync();

snippet source | anchor

Local Message Routing

In the absence of any kind of routing rules, any message enqueued with ICommandBus.Enqueue() will just be handled by the default local queue. To override that choice on a message type by message type basis, you can use the [LocalQueue] attribute on a message type:

[LocalQueue("important")]
public class ImportanceMessage
{

}

snippet source | anchor

Otherwise, you can take advantage of Jasper's message routing rules like this:

using var host = await Host.CreateDefaultBuilder()
    .UseJasper(opts =>
    {
        // Publish Message2 messages to the "important"
        // local queue
        opts.PublishMessage<Message2>()
            .ToLocalQueue("important");
    }).StartAsync();

snippet source | anchor

The routing rules and/or [LocalQueue] routing is also honored for cascading messages, meaning that any message that is handled inside a Jasper system could publish cascading messages to the local worker queues.

See message routing rules for more information.

Durable Local Messages

The local worker queues can optionally be designated as "durable," meaning that local messages would be persisted until they can be successfully processed to provide a guarantee that the message will be successfully processed in the case of the running application faulting or having been shut down prematurely (assuming that other nodes are running or it's restarted later of course).

Here is an example of configuring a local queue to be durable:

using var host = await Host.CreateDefaultBuilder()
    .UseJasper(opts =>
    {
        // Make the default local queue durable
        opts.DefaultLocalQueue.UseInbox();

        // Or do just this by name
        opts.LocalQueue("important")
            .UseInbox();
    }).StartAsync();

snippet source | anchor

See Persistent Messaging for more information.

Scheduling Local Execution

TIP

If you need the command scheduling to be persistent or be persisted across service restarts, you'll need to enable the message persistence within Jasper.

The "scheduled execution" feature can be used with local execution within the same application. See Scheduled Messages for more information. Use the ICommandBus.ScheduleAsync() methods like this:

public async Task ScheduleLocally(IExecutionContext bus, Guid invoiceId)
{
    var message = new ValidateInvoiceIsNotLate
    {
        InvoiceId = invoiceId
    };

    // Schedule the message to be processed in a certain amount
    // of time
    await bus.ScheduleAsync(message, 30.Days());

    // Schedule the message to be processed at a certain time
    await bus.ScheduleAsync(message, DateTimeOffset.Now.AddDays(30));
}

snippet source | anchor

Configuring Parallelization and Execution Properties

The queues are built on top of the TPL Dataflow library, so it's pretty easy to configure parallelization (how many concurrent messages could be handled by a queue). Here's an example of how to establish this:

using var host = await Host.CreateDefaultBuilder()
    .UseJasper(opts =>
    {
        // Force a local queue to be
        // strictly first in, first out
        // with no more than a single
        // thread handling messages enqueued
        // here

        // Use this option if message ordering is
        // important
        opts.LocalQueue("one")
            .Sequential();

        // Specify the maximum number of parallel threads
        opts.LocalQueue("two")
            .MaximumParallelMessages(5);

        // Or just edit the ActionBlock options directly
        opts.LocalQueue("three")
            .ConfigureExecution(options =>
            {
                options.MaxDegreeOfParallelism = 5;
                options.BoundedCapacity = 1000;
            });

        // And finally, this enrolls a queue into the persistent inbox
        // so that messages can happily be retained and processed
        // after the service is restarted
        opts.LocalQueue("four").UseInbox();
    }).StartAsync();

snippet source | anchor

Explicitly Enqueue to a Specific Local Queue

If you want to enqueue a message locally to a specific worker queue, you can use this syntax:

public ValueTask EnqueueToQueue(IExecutionContext bus)
{
    var @event = new InvoiceCreated
    {
        Time = DateTimeOffset.Now,
        Purchaser = "Guy Fieri",
        Amount = 112.34,
        Item = "Cookbook"
    };

    // Put this message in a local worker
    // queue named 'highpriority'
    return bus.EnqueueAsync(@event, "highpriority");
}

snippet source | anchor

Local Queues as a Messaging Transport

warning

The local transport is used underneath the covers by Jasper for retrying locally enqueued messages or scheduled messages that may have initially failed.

In the sample Jasper configuration shown below:

using var host = await Host.CreateDefaultBuilder()
    .UseJasper(opts =>
    {
        // Publish Message2 messages to the "important"
        // local queue
        opts.PublishMessage<Message2>()
            .ToLocalQueue("important");
    }).StartAsync();

snippet source | anchor

Calling IMessagePublisher.Send(new Message2()) would publish the message to the local "important" queue.