Local Worker Queues


Jasper's Execution Pipeline can be consumed from in memory queues within your application. The queueing is all based around ActionBlock objects from the TPL Dataflow library. As such, you have a fair amount of control over parallelization and even some back pressure. These local queues can be used directly, or as a transport that uses the application's message routing rules.

Enqueueing Messages Locally

Note! The IMessagePublisher and IMessageContext interfaces both implement the ICommandBus interface, and truth be told, it's just one underlying concrete class and the interfaces just expose narrower or broader options.

Using the ICommandBus.Enqueue() method, you can queue up messages to be executed asynchronously:


public static async Task enqueue_locally(ICommandBus bus)
{
    // Enqueue a message to the local worker queues
    await bus.Enqueue(new Message1());

}

This feature is useful for asynchronous processing in web applications or really any kind of application where you need some parallelization or concurrency.

Some things to know about the local queues:

  • Local worker queues can be durable, meaning that the enqueued messages are persisted first so that they aren't lost if the application is shut down before they're processed. More on that below.
  • You can use any number of named local queues, and they don't even have to be declared upfront (might want to be careful with that though)
  • Local worker queues utilize Jasper's Error Handling policies to selectively handle any detected exceptions from the message handlers
  • You can control the priority and parallelization of each individual local queue
  • Message types can be routed to particular queues
  • Cascading messages can be used with the local queues
  • The local queues can be used like any other message transport and be the target of routing rules

Default Queues

Out of the box, each Jasper application has a default queue named "default". In the absence of any other routing rules, all messages enqueued to ICommandBus will be published to this queue.

Local Message Routing

In the absence of any kind of routing rules, any message enqueued with ICommandBus.Enqueue() will just be handled by the default local queue. To override that choice on a message type by message type basis, you can use the [LocalQueue] attribute on a message type:


[LocalQueue("important")]
public class ImportanceMessage
{

}

Otherwise, you can take advantage of the routing rules on the JasperOptions.Endpoints.Publish() method like this:


public class LocalTransportApp : JasperOptions
{
    public LocalTransportApp()
    {
        // Publish the message Message2 the "important"
        // local queue
        Endpoints.Publish(x =>
        {
            x.Message<Message2>();
            x.ToLocalQueue("important");
        });
    }
}

The routing rules and/or [LocalQueue] routing is also honored for cascading messages, meaning that any message that is handled inside a Jasper system could publish cascading messages to the local worker queues.

Durable Local Messages

The local worker queues can optionally be designated as "durable," meaning that local messages would be persisted until they can be successfully processed to provide a guarantee that the message will be successfully processed in the case of the running application faulting or having been shut down prematurely (assuming that other nodes are running or it's restarted later of course).

Here is an example of configuring a local queue to be durable:


public class LocalDurableTransportApp : JasperOptions
{
    public LocalDurableTransportApp()
    {
        // Make the default local queue durable
        Endpoints.DefaultLocalQueue.Durable();

        // Or do just this by name
        Endpoints
            .LocalQueue("important")
            .Durable();
    }
}

See Durable Messaging and Command Processing for more information.

Scheduling Local Execution

The "scheduled execution" feature can be used with local execution within the same application. See Scheduled Message Delivery and Execution for more information. Use the ICommandBus.Schedule() methods like this:


public async Task ScheduleLocally(IMessageContext bus, Guid invoiceId)
{
    var message = new ValidateInvoiceIsNotLate
    {
        InvoiceId = invoiceId
    };

    // Schedule the message to be processed in a certain amount
    // of time
    await bus.Schedule(message, 30.Days());

    // Schedule the message to be processed at a certain time
    await bus.Schedule(message, DateTime.UtcNow.AddDays(30));
}

Configuring Parallelization and Execution Properties

The queues are built on top of the TPL Dataflow library, so it's pretty easy to configure parallelization (how many concurrent messages could be handled by a queue). Here's an example of how to establish this:


public class LocalQueuesApp : JasperOptions
{
    public LocalQueuesApp()
    {
        // Force a local queue to be
        // strictly first in, first out
        // with no more than a single
        // thread handling messages enqueued
        // here

        // Use this option if message ordering is
        // important
        Endpoints
            .LocalQueue("one")
            .Durable()
            .Sequential();


        Endpoints
            .LocalQueue("two")
            .MaximumThreads(5);


        // Or just edit the ActionBlock directly
        Endpoints.LocalQueue("three")
            .ConfigureExecution(options =>
            {
                options.MaxDegreeOfParallelism = 5;
                options.BoundedCapacity = 1000;
            });
    }
}

Explicitly Enqueue to a Specific Local Queue

If you want to enqueue a message locally to a specific worker queue, you can use this syntax:


public Task EnqueueToQueue(IMessageContext bus)
{
    var @event = new InvoiceCreated
    {
        Time = DateTime.UtcNow,
        Purchaser = "Guy Fieri",
        Amount = 112.34,
        Item = "Cookbook"
    };

    // Put this message in a local worker
    // queue named 'highpriority'
    return bus.Enqueue(@event, "highpriority");
}

Local Queues as a Messaging Transport

Note! The local transport is used underneath the covers by Jasper for retrying locally enqueued messages or scheduled messages that may have initially failed.

In the sample Jasper configuration shown below:


public class LocalTransportApp : JasperOptions
{
    public LocalTransportApp()
    {
        // Publish the message Message2 the "important"
        // local queue
        Endpoints.Publish(x =>
        {
            x.Message<Message2>();
            x.ToLocalQueue("important");
        });
    }
}

Calling IMessagePublisher.Send(new Message2()) would publish the message to the local "important" queue.