Using Postgresql with Jasper
The Jasper.Persistence.Postgresql Nuget library provides Jasper users with a quick way to integrate Postgresql-backed persistence into their Jasper applications. To get started, just add the Jasper.Persistence.Postgresql Nuget to your project, and enable the persistence like this:
public class AppUsingPostgresql : JasperOptions
{
public AppUsingPostgresql()
{
// If you know the connection string
Extensions.PersistMessagesWithPostgresql("your connection string", "my_app_schema");
}
public override void Configure(IHostEnvironment hosting, IConfiguration config)
{
// Or if you need access to the application configuration and hosting
// But this time you may need to work directly with the PostgresqlBackedPersistence
Extensions.Include<PostgresqlBackedPersistence>(ext =>
{
if (hosting.IsDevelopment())
{
// if so desired, the context argument gives you
// access to both the IConfiguration and IHostingEnvironment
// of the running application, so you could do
// environment specific configuration here
}
ext.Settings.ConnectionString = config["postgresql"];
// If your application uses a schema besides "dbo"
ext.Settings.SchemaName = "my_app_schema";
});
}
}
Enabling this configuration adds a couple things to your system:
- Service registrations in your IoC container for
DbConnection
andNpgsqlConnection
, with theScoped
lifecycle - "Outbox" pattern support as demonstrated below
- Message persistence using your application's Postgresql database, including outbox support with Postgresql
- Support for the
[Transactional]
attribute as shown below
Transactional Middleware
Assuming that the Jasper.Persistence.Postgresql Nuget is referenced by your project, you can use the [Transactional]
attribute on message (or HTTP) handler methods to wrap the message handling inside
a single Sql Server transaction like so:
public class ItemCreatedHandler
{
[Transactional]
public static async Task Handle(
ItemCreated created,
NpgsqlTransaction tx, // the current transaction
Envelope envelope)
{
// Using some extension method helpers inside of Jasper here
await tx.CreateCommand("insert into receiver.item_created (id, name) values (@id, @name)")
.With("id", created.Id)
.With("name", created.Name)
.ExecuteNonQueryAsync();
}
}
When you use this middleware, be sure to pull in the current NpgsqlTransaction
object as a parameter to your handler method.
"Outbox" Pattern Usage
See Jasper’s “Outbox” Pattern Support for more context around why you would care about the "outbox" pattern.
Jasper supports the "outbox" pattern with Postgresql connections. You can explicitly opt into this usage with code like this:
using (var conn = new NpgsqlConnection(Servers.PostgresConnectionString))
{
await conn.OpenAsync();
var tx = conn.BeginTransaction();
// "context" is an IMessageContext object
await context.EnlistInTransaction(tx);
await action(context);
tx.Commit();
await context.SendAllQueuedOutgoingMessages();
}
If you use the [Transaction]
middleware in a message handler, the middleware will take care of some of the repetitive mechanics for you. In the code below, the IMessageContext
is enrolled in the current transaction before the action runs, and the outgoing messages
are flushed into the outgoing sending queue after the action runs.
[Transactional]
public async Task<ItemCreatedEvent> Handle(CreateItemCommand command, NpgsqlTransaction tx)
{
var item = new Item {Name = command.Name};
// persist the new Item with the
// current transaction
await persist(tx, item);
return new ItemCreatedEvent {Item = item};
}
Message Persistence Schema
The message persistence requires and adds these tables to your schema:
jasper_incoming_envelopes
- stores incoming and scheduled envelopes until they are successfully processedjasper_outgoing_envelopes
- stores outgoing envelopes until they are successfully sent through the transportsjasper_dead_letters
- stores "dead letter" envelopes that could not be processed. See Dead Letter Envelopes for more information
Managing the Postgresql Schema
In testing, you can build -- or rebuild -- the message storage in your system with a call to the RebuildMessageStorage()
extension method off of either IWebHost
or IJasperHost
as shown below in a sample taken from xUnit integration testing with Jasper:
public class MyJasperAppFixture : IDisposable
{
public MyJasperAppFixture()
{
Host = JasperHost.For<MyJasperApp>();
// This extension method will blow away any existing
// schema items for message persistence in your configured
// database and then rebuilds the message persistence objects
// before the *first* integration test runs
Host.RebuildMessageStorage();
}
public IHost Host { get; }
public void Dispose()
{
Host?.Dispose();
}
}
// An xUnit test fixture that uses our MyJasperAppFixture
public class IntegrationTester : IClassFixture<MyJasperAppFixture>
{
private readonly MyJasperAppFixture _fixture;
public IntegrationTester(MyJasperAppFixture fixture)
{
_fixture = fixture;
}
}
See this GitHub issue for some utilities to better manage the database objects.