Jasper as Command Bus
TIP
The in memory queueing feature is automatically enabled for all known message types within all Jasper applications.
Jasper can be used as an in-memory, command bus where messages can be processed either immediately or through in memory queues within your application. The queueing is all based around the TPL Dataflow library objects from the TPL Dataflow library. As such, you have a fair amount of control over parallelization and even some back pressure. These local queues can be used directly, or as a transport to accept messages sent through IMessagePublisher.SendAsync()
or IMessagePublisher.PublishAsync()
. using the application's message routing rules.
Enqueueing Messages Locally
warning
The IMessagePublisher
and IExecutionContext
interfaces both implement the ICommandBus
interface as well.
You can queue up messages to be executed locally and asynchronously in a background thread:
public static async Task enqueue_locally(ICommandBus bus)
{
// Enqueue a message to the local worker queues
await bus.EnqueueAsync(new Message1());
}
This feature is useful for asynchronous processing in web applications or really any kind of application where you need some parallelization or concurrency.
Some things to know about the local queues:
- Local worker queues can be durable, meaning that the enqueued messages are persisted first so that they aren't lost if the application is shut down before they're processed. More on that below.
- You can use any number of named local queues, and they don't even have to be declared upfront (might want to be careful with that though)
- Local worker queues utilize Jasper's error handling policies to selectively handle any detected exceptions from the message handlers.
- You can control the priority and parallelization of each individual local queue
- Message types can be routed to particular queues
- Cascading messages can be used with the local queues
- The local queues can be used like any other message transport and be the target of routing rules
The Default Queue
Out of the box, each Jasper application has a default queue named "default". In the absence of any other routing rules, all messages enqueued to ICommandBus
will be published to this queue. The default in memory queue can be configured like this:
using var host = await Host.CreateDefaultBuilder()
.UseJasper(opts =>
{
opts.DefaultLocalQueue.MaximumParallelMessages(3);
}).StartAsync();
Local Message Routing
In the absence of any kind of routing rules, any message enqueued with ICommandBus.Enqueue()
will just be handled by the default local queue. To override that choice on a message type by message type basis, you can use the [LocalQueue]
attribute on a message type:
[LocalQueue("important")]
public class ImportanceMessage
{
}
Otherwise, you can take advantage of Jasper's message routing rules like this:
using var host = await Host.CreateDefaultBuilder()
.UseJasper(opts =>
{
// Publish Message2 messages to the "important"
// local queue
opts.PublishMessage<Message2>()
.ToLocalQueue("important");
}).StartAsync();
The routing rules and/or [LocalQueue]
routing is also honored for cascading messages, meaning that any message that is handled inside a Jasper system could publish cascading messages to the local worker queues.
See message routing rules for more information.
Durable Local Messages
The local worker queues can optionally be designated as "durable," meaning that local messages would be persisted until they can be successfully processed to provide a guarantee that the message will be successfully processed in the case of the running application faulting or having been shut down prematurely (assuming that other nodes are running or it's restarted later of course).
Here is an example of configuring a local queue to be durable:
using var host = await Host.CreateDefaultBuilder()
.UseJasper(opts =>
{
// Make the default local queue durable
opts.DefaultLocalQueue.UseInbox();
// Or do just this by name
opts.LocalQueue("important")
.UseInbox();
}).StartAsync();
See Persistent Messaging for more information.
Scheduling Local Execution
TIP
If you need the command scheduling to be persistent or be persisted across service restarts, you'll need to enable the message persistence within Jasper.
The "scheduled execution" feature can be used with local execution within the same application. See Scheduled Messages for more information. Use the ICommandBus.ScheduleAsync()
methods like this:
public async Task ScheduleLocally(IExecutionContext bus, Guid invoiceId)
{
var message = new ValidateInvoiceIsNotLate
{
InvoiceId = invoiceId
};
// Schedule the message to be processed in a certain amount
// of time
await bus.ScheduleAsync(message, 30.Days());
// Schedule the message to be processed at a certain time
await bus.ScheduleAsync(message, DateTimeOffset.Now.AddDays(30));
}
Configuring Parallelization and Execution Properties
The queues are built on top of the TPL Dataflow library, so it's pretty easy to configure parallelization (how many concurrent messages could be handled by a queue). Here's an example of how to establish this:
using var host = await Host.CreateDefaultBuilder()
.UseJasper(opts =>
{
// Force a local queue to be
// strictly first in, first out
// with no more than a single
// thread handling messages enqueued
// here
// Use this option if message ordering is
// important
opts.LocalQueue("one")
.Sequential();
// Specify the maximum number of parallel threads
opts.LocalQueue("two")
.MaximumParallelMessages(5);
// Or just edit the ActionBlock options directly
opts.LocalQueue("three")
.ConfigureExecution(options =>
{
options.MaxDegreeOfParallelism = 5;
options.BoundedCapacity = 1000;
});
// And finally, this enrolls a queue into the persistent inbox
// so that messages can happily be retained and processed
// after the service is restarted
opts.LocalQueue("four").UseInbox();
}).StartAsync();
Explicitly Enqueue to a Specific Local Queue
If you want to enqueue a message locally to a specific worker queue, you can use this syntax:
public ValueTask EnqueueToQueue(IExecutionContext bus)
{
var @event = new InvoiceCreated
{
Time = DateTimeOffset.Now,
Purchaser = "Guy Fieri",
Amount = 112.34,
Item = "Cookbook"
};
// Put this message in a local worker
// queue named 'highpriority'
return bus.EnqueueAsync(@event, "highpriority");
}
Local Queues as a Messaging Transport
warning
The local transport is used underneath the covers by Jasper for retrying locally enqueued messages or scheduled messages that may have initially failed.
In the sample Jasper configuration shown below:
using var host = await Host.CreateDefaultBuilder()
.UseJasper(opts =>
{
// Publish Message2 messages to the "important"
// local queue
opts.PublishMessage<Message2>()
.ToLocalQueue("important");
}).StartAsync();
Calling IMessagePublisher.Send(new Message2())
would publish the message to the local "important" queue.