Fork me on GitHub

Custom Projections Edit on GitHub


Multistream Projections using ViewProjection

The ViewProjection class is an implementation of the IProjection that can handle building a projection from multiple streams.

This can be setup from configuration like:


StoreOptions(_ =>
{
    _.AutoCreateSchemaObjects = AutoCreate.All;
    _.Events.InlineProjections.AggregateStreamsWith<QuestParty>();
    _.Events.ProjectView<PersistedView, Guid>()
        .ProjectEvent<ProjectionEvent<QuestStarted>>((view, @event) => { view.Events.Add(@event.Data); view.StreamIdsForEvents.Add(@event.StreamId); })
        .ProjectEvent<MembersJoined>(e => e.QuestId, (view, @event) => { view.Events.Add(@event); })
        .ProjectEvent<ProjectionEvent<MonsterSlayed>>(e => e.Data.QuestId, (view, @event) => { view.Events.Add(@event.Data); view.StreamIdsForEvents.Add(@event.StreamId); })
        .DeleteEvent<QuestEnded>()
        .DeleteEvent<MembersDeparted>(e => e.QuestId)
        .DeleteEvent<MonsterDestroyed>((session, e) => session.Load<QuestParty>(e.QuestId).Id);
});
or through a class like:

public class PersistViewProjection: ViewProjection<PersistedView, Guid>
{
    public PersistViewProjection()
    {
        ProjectEvent<QuestStarted>(Persist);
        ProjectEvent<MembersJoined>(e => e.QuestId, Persist);
        ProjectEvent<MonsterSlayed>((session, e) => session.Load<QuestParty>(e.QuestId).Id, Persist);
        DeleteEvent<QuestEnded>();
        DeleteEvent<MembersDeparted>(e => e.QuestId);
        DeleteEvent<MonsterDestroyed>((session, e) => session.Load<QuestParty>(e.QuestId).Id);
    }

    private void Persist<T>(PersistedView view, T @event)
    {
        view.Events.Add(@event);
    }
}


ProjectEvent by default takes two parameters:

  • property from event that will be used as projection document selector,
  • apply method that describes the projection by itself.

DeleteEvent takes the first parameter - as by the nature of this method it's only needed to select which document should be deleted.

Both methods may also select multiple Ids:

  • ProjectEvent if a List<TId> is passed, the handler method will be called for each Id in the collection.
  • DeleteEvent if a List<TId> is passed, then each document tied to the Id in the collection will be removed.

Each of these methods take various overloads that allow selecting the Id field implicitly, through a property or through two different Funcs Func<IDocumentSession, TEvent, TId> and Func<TEvent, TId>.

Warning:
Projection class needs to have:
  • public default constructor,
  • Id property with public getter or setter.
It comes of the way how Marten handles projection mechanism:
  1. Try to find document that has the same Id as the value of the property selected from event (so eg. for UserCreated event it will be UserId).
  2. If such document exists, then new record needs to be created. It's done using default constructor.
  3. If document with such Id was found then it's being loaded from database.
  4. Document is updated with the defined in ViewProjection logic (using expression from second ProjectEvent parameter).
  5. Created or updated document is upserted to database.

Using event meta data

If additional Marten event details are needed, then events can use the ProjectionEvent<> generic when setting them up with ProjectEvent. ProjectionEvent exposes the Marten Id, Version, Timestamp and Data.



public class Lap
{
    public Guid Id { get; set; }

    public DateTimeOffset? Start { get; set; }

    public DateTimeOffset? End { get; set; }
}

public class LapStarted
{
    public Guid LapId { get; set; }
}

public class LapFinished
{
    public Guid LapId { get; set; }
}

public class LapViewProjection: ViewProjection<Lap, Guid>
{
    public LapViewProjection()
    {
        ProjectEvent<ProjectionEvent<LapStarted>>(e => e.Data.LapId, Persist);
        ProjectEvent<ProjectionEvent<LapFinished>>(e => e.Data.LapId, Persist);
    }

    private void Persist(Lap view, ProjectionEvent<LapStarted> eventData)
    {
        view.Start = eventData.Timestamp;
    }

    private void Persist(Lap view, ProjectionEvent<LapFinished> eventData)
    {
        view.End = eventData.Timestamp;
    }
}


Injecting helpers classes

ViewProjections instances are created (by default) during the DocumentStore initialization. Marten gives also possible to register them with factory method. With such registration projections are created on runtime during the events application. Thanks to that it's possible to setup custom creation logic or event connect dependency injection mechanism.


StoreOptions(_ =>
{
    _.AutoCreateSchemaObjects = AutoCreate.All;
    _.Events.TenancyStyle = tenancyStyle;
    _.Events.InlineProjections.AggregateStreamsWith<QuestParty>();
    _.Events.InlineProjections.Add(() => new PersistViewProjectionWithInjection(logger));
});

By convention it's needed to provide the default constructor with projections definition and other with code injection (that calls the default constructor).


public class PersistViewProjectionWithInjection: PersistViewProjection
{
    private readonly Logger logger;

    public PersistViewProjectionWithInjection() : base()
    {
        ProjectEvent<QuestPaused>(@event => @event.QuestId, LogAndPersist);
    }

    public PersistViewProjectionWithInjection(Logger logger) : this()
    {
        this.logger = logger;
    }

    private void LogAndPersist<T>(PersistedView view, T @event)
    {
        logger.Log($"Handled {typeof(T).Name} event: {@event.ToString()}");
        view.Events.Add(@event);
    }
}


Using async projections

It's also possible to use async version of ProjectEvent. Using ProjectEventAsync gives possibility to call the async apis (from Marten or other frameworks) to get better resources utilization.

Sample usage could be loading other document/projection to create denormalized view.



// Customer main aggregate
public class Customer
{
    public Guid Id { get; set; }

    public string FullName { get; set; }
}

// Event informing that customer full name was updated
public class CustomerFullNameUpdated
{
    public Guid CustomerId { get; set; }

    public string FullName { get; set; }
}

// Bank Account main aggregate
public class BankAccount
{
    public Guid Id { get; set; }

    // normalized reference with id to related aggregate
    public Guid CustomerId { get; set; }

    public string Number { get; set; }
}

//Bank Account created event with normalized data
public class BankAccountCreated
{
    public Guid BankAccountId { get; set; }

    public Guid CustomerId { get; set; }

    public string Number { get; set; }
}

// Denormalized read model with full data of related document
public class BankAccountView
{
    public Guid Id { get; set; }

    // Full info about customer instead of just CustomerId
    public Customer Customer { get; set; }

    public string Number { get; set; }
}

public class BankAccountViewProjection: ViewProjection<BankAccountView, Guid>
{
    public BankAccountViewProjection()
    {
        ProjectEventAsync<BankAccountCreated>(e => e.BankAccountId, PersistAsync);

        // one customer might have more than one account
        Func<IDocumentSession, CustomerFullNameUpdated, List<Guid>> selectCustomerBankAccountIds =
            (ds, @event) => ds.Query<BankAccountView>()
                              .Where(a => a.Customer.Id == @event.CustomerId)
                              .Select(a => a.Id).ToList();

        ProjectEvent<CustomerFullNameUpdated>(selectCustomerBankAccountIds, Persist);
    }

    private async Task PersistAsync
    (
        IDocumentSession documentSession,
        BankAccountView view,
        BankAccountCreated @event
    )
    {
        // load asynchronously document to use it in denormalized view
        var customer = await documentSession.LoadAsync<Customer>(@event.CustomerId);

        view.Customer = customer;
        view.Number = @event.Number;
    }

    private void Persist(BankAccountView view, CustomerFullNameUpdated @event)
    {
        view.Customer.FullName = @event.FullName;
    }
}