Using Marten with Jasper


Note! The Jasper.Persistence.Marten has a dependency on the lower level Jasper.Persistence.Postgresql Nuget library.

The Jasper.Persistence.Marten library provides some easy to use recipes for integrating Marten and Postgresql into a Jasper application. All you need to do to get started with Marten + Jasper is to add the Jasper.Persistence.Marten nuget to your project and at minimum, at least set the connection string to the underlying Postgresql database by configuring Marten's StoreOptions object like this:


public class AppWithMarten : JasperOptions
{
    public override void Configure(IHostEnvironment hosting, IConfiguration config)
    {
        // At the simplest, you would just need to tell Marten
        // the connection string to the application database
        Extensions.UseMarten(config.GetConnectionString("marten"));
    }
}

Note that ConfigureMarten() is an extension method in Jasper.Marten.

Once that's done, you will be able to inject the following Marten services as either constructor arguments or method parameters in message or HTTP handlers:

  1. IDocumentStore
  2. IDocumentSession - opened with the default IDocumentStore.OpenSession() method
  3. IQuerySession

Likewise, all of these service will be registered in the underlying IoC container for the application.

If you need to customize an IDocumentSession for something like transaction levels or automatic dirty checking, we recommend that you just take in IDocumentStore and create the session in the application code.

As an example:


public class UsingDocumentSessionHandler
{
    // Take in IDocumentStore as a constructor argument
    public UsingDocumentSessionHandler(IDocumentStore store)
    {
    }

    // Take in IDocumentSession as an argument
    public void Handle(Message1 message, IDocumentSession session)
    {
    }
}

Transactional Middleware

Assuming that the Jasper.Persistence.Marten Nuget is referenced by your project, you can explicitly apply transactional middleware to a message or HTTP handler action with the [Transactional] attribute as shown below:


public class CreateDocCommandHandler
{
    [Transactional]
    public void Handle(CreateDocCommand message, IDocumentSession session)
    {
        session.Store(new FakeDoc {Id = message.Id});
    }
}

Doing this will simply insert a call to IDocumentSession.SaveChangesAsync() after the last handler action is called within the generated MessageHandler. This effectively makes a unit of work out of all the actions that might be called to process a single message.

This attribute can appear on either the handler class that will apply to all the actions on that class, or on a specific action method.

If so desired, you can also use a policy to apply the Marten transaction semantics with a policy. As an example, let's say that you want every message handler where the message type name ends with "Command" to use the Marten transaction middleware. You could accomplish that with a handler policy like this:


public class CommandsAreTransactional : IHandlerPolicy
{
    public void Apply(HandlerGraph graph, GenerationRules rules, IContainer container)
    {
        // Important! Create a brand new TransactionalFrame
        // for each chain
        graph
            .Chains
            .Where(x => x.MessageType.Name.EndsWith("Command"))
            .Each(x => x.Middleware.Add(new TransactionalFrame()));
    }
}

Then add the policy to your application like this:


public class CommandsAreTransactionalApp : JasperOptions
{
    public CommandsAreTransactionalApp()
    {
        // And actually use the policy
        Handlers.GlobalPolicy<CommandsAreTransactional>();
    }
}

"Outbox" Pattern Usage

Using the Marten-backed persistence, you can take advantage of Jasper's implementation of the "outbox" pattern where outgoing messages are persisted as part of a native database transaction before being sent to the outgoing transports. The purpose of this pattern is to achieve guaranteed messaging and consistency between the outgoing messages and the current transaction without being forced to use distributed, two phase transactions between your application database and the outgoing queues like RabbitMQ.

To see the outbox pattern in action, consider this ASP.Net Core MVC controller action method:


public async Task<IActionResult> PostCreateUser(
    [FromBody] CreateUser user,
    [FromServices] IMessageContext context,
    [FromServices] IDocumentSession session)
{
    await context.EnlistInTransaction(session);

    session.Store(new User {Name = user.Name});

    var @event = new UserCreated {UserName = user.Name};

    await context.Publish(@event);

    await session.SaveChangesAsync();

    return Ok();
}

A couple notes here:

  • The IMessageContext.EnlistInTransaction(IDocumentSession) method is an extension method in the Jasper.Marten library. When it is called, it tells the IMessageContext to register any outgoing messages to be persisted by that IDocumentSession when the Marten session is saved
  • No messages will actually be placed into Jasper's outgoing, sender queues until the session is successfully saved
  • When the session is saved, the outgoing envelopes will be persisted in the same native Postgresql database, then actually sent to the outgoing transport sending agents

Using the outbox pattern, as long as your transaction is successfully committed, the outgoing messages will eventually be sent out, even if the running system somehow manages to get shut down between the transaction being committed and the messages being successfully sent to the recipients or even if the recipient services are temporarily down and unreachable.

The outbox usage is a little bit easier to use within a Jasper message handler action decorated with the [MartenTransaction] attribute as shown below:


[Transactional]
public static UserCreated Handle(CreateUser message, IDocumentSession session)
{
    session.Store(new User {Name = message.Name});

    return new UserCreated {UserName = message.Name};
}

By decorating the action with that attribute, Jasper.Marten will inject a little bit of code around that method to enlist the current message context into the current Marten IDocumentSession, and the outgoing UserCreated message would be persisted as an outgoing envelope when the session is successfully saved.

Saga Storage

See Stateful Sagas for an introduction to stateful sagas within Jasper.

To use Marten as the backing store for saga persistence, start by enabling the Marten message persistence like this:


public class MartenSagaApp : JasperOptions
{
    public override void Configure(IHostEnvironment hosting, IConfiguration config)
    {
        // This example pulls the connection string to the underlying Postgresql
        // database from configuration
        Extensions.UseMarten(config.GetConnectionString("connectionString"));
    }
}

Any message handlers within a StatefulSagaOf<T> class will automatically have the transactional middleware support applied. The limitation here is that you have to allow Jasper.Marten to handle all the transactional boundaries.

The saga state documents are all persisted as Marten documents.

Customizing How the Session is Created

By default, using [Transactional] or just injecting an IDocumentSession with the Marten integration will create a lightweight session in Marten using the IDocumentStore.LightweightSession() call. However, Marten has many other options to create sessions with different transaction levels, heavier identity map behavior, or by attaching custom listeners. To allow you to use the full range of Marten behavior, you can choose to override the mechanics of how a session is opened for any given message handler by just placing a method called OpenSession() on your handler class that returns an IDocumentSession. If Jasper sees that method exists, it will call that method to create your session.

Here's an example from the tests:


public class SessionUsingBlock2
{
    // This method will be used to create the IDocumentSession
    // that will be used by the two Consume() methods below
    public IDocumentSession OpenSession(IDocumentStore store)
    {
        // Here I'm opting to use the heavier,
        // automatic dirty state checking
        return store.DirtyTrackedSession();
    }


    public void Consume(Message1 message, IDocumentSession session)
    {
    }

    [Transactional]
    public void Consume(Message2 message, IDocumentSession session)
    {
    }
}