Fork me on GitHub

Aggregates, Events, Repositories Edit on GitHub


This use case demonstrates how to capture state changes in events and then replaying that state from the database. This is done by first introducing some supporting infrastructure, then implementing a model of invoice, together with invoice lines, on top of that.

Scenario

To model, capture and replay the state of an object through events, some infrastructure is established to dispatch events to their respective handlers. This is demonstrated in the AggregateBase class below - it serves as the basis for objects whose state is to be modeled.


// Infrastructure to capture modifications to state in events
public abstract class AggregateBase
{
    // For indexing our event streams
    public string Id { get; protected set; }
    // For protecting the state, i.e. conflict prevention
    public int Version { get; protected set; }

    private readonly List<object> uncommittedEvents = new List<object>();
    private readonly Dictionary<Type, Action<object>> handlers = new Dictionary<Type, Action<object>>();
    
    // Get the deltas, i.e. events that make up the state, not yet persisted
    public IEnumerable<object> GetUncommittedEvents()
    {
        return uncommittedEvents;
    }

    // Mark the deltas as persisted.
    public void ClearUncommittedEvents()
    {
        uncommittedEvents.Clear();            
    }

    // Infrastructure for raising events & registering handlers

    protected void Register<T>(Action<T> handle)
    {
        handlers[typeof(T)] = e => handle((T)e);
    } 

    protected void RaiseEvent(object @event)
    {
        ApplyEvent(@event);
        uncommittedEvents.Add(@event);
    }

    private void ApplyEvent(object @event)
    {
        handlers[@event.GetType()](@event);
        // Each event bumps our version
        Version++;
    }
}

With the first piece of infrastructure implemented, two events to capture state changes of an invoice are introduced. Namely, creation of an invoice, accompanied by an invoice number, and addition of lines to an invoice:


public sealed class InvoiceCreated
{
    public readonly int InvoiceNumber;

    public InvoiceCreated(int invoiceNumber)
    {
        InvoiceNumber = invoiceNumber;
    }
}

public sealed class LineItemAdded
{
    public readonly decimal Price;
    public readonly decimal Vat;
    public readonly string Description;

    public LineItemAdded(decimal price, decimal vat, string description)
    {
        Price = price;
        Vat = vat;
        Description = description;
    }
}

With the events in place to present the deltas of an invoice, an aggregate is implemented, using the infrastructure presented above, to create and replay state from the described events.


public sealed class Invoice : AggregateBase
{
    public Invoice(int invoiceNumber) : this()
    {
        if (invoiceNumber <= 0)
        {
            throw new ArgumentException("Invoice number needs to be positive", nameof(invoiceNumber));
        }

        // Instantiation creates our initial event, capturing the invoice number
        RaiseEvent(new InvoiceCreated(invoiceNumber));
    }

    // Enforce any contracts on input, then raise event caputring the data
    public void AddLine(decimal price, decimal vat, string description)
    {
        if (string.IsNullOrEmpty(description))
        {
            throw new ArgumentException("Description cannot be empty", nameof(description));
        }

        RaiseEvent(new LineItemAdded(price, vat, description));
    }

    public override string ToString()
    {
        var lineItems = string.Join(Environment.NewLine, lines.Select(x => $"{x.Item1}: {x.Item2} ({x.Item3}% VAT)"));
        return $"{lineItems}{Environment.NewLine}Total: {Total}";
    }

    public decimal Total { get; private set; }
    private readonly List<Tuple<string, decimal, decimal>> lines = new List<Tuple<string, decimal, decimal>>();

    private Invoice()
    {
        // Register the event types that make up our aggregate , together with their respective handlers
        Register<InvoiceCreated>(Apply);
        Register<LineItemAdded>(Apply);            
    }

    // Apply the deltas to mutate our state
    private void Apply(InvoiceCreated @event)
    {
        Id = @event.InvoiceNumber.ToString(CultureInfo.InvariantCulture);
    }

    // Apply the deltas to mutate our state
    private void Apply(LineItemAdded @event)
    {
        var price = @event.Price * (1 + @event.Vat / 100);
        Total += price;
        lines.Add(Tuple.Create(@event.Description, price, @event.Vat));
    }
}

The implemented invoice protects its state by not exposing mutable data, while enforcing its contracts through argument validation. Once an applicable state modification is introduced, either through the constructor (which numbers our invoice and captures that in an event) or the Invoice.AddLine method, a respective event capturing that data is recorded.

Lastly, to persist the deltas described above and to replay the state of an object from such persisted data, a repository is implemented. The said repository pushes the deltas of an object to event stream, indexed by the ID of the object.


public sealed class AggregateRepository
{
    private readonly IDocumentStore store;

    public AggregateRepository(IDocumentStore store)
    {
        this.store = store;
    }

    public void Store(AggregateBase aggregate)
    {
        using (var session = store.OpenSession())
        {
            // Take non-persisted events, push them to the event stream, indexed by the aggregate ID
            var events = aggregate.GetUncommittedEvents().ToArray();
            session.Events.Append(aggregate.Id, aggregate.Version, events);
            session.SaveChanges();
        }
        // Once succesfully persisted, clear events from list of uncommitted events
        aggregate.ClearUncommittedEvents();            
    }        

    private static readonly MethodInfo ApplyEvent = typeof(AggregateBase).GetMethod("ApplyEvent", BindingFlags.Instance | BindingFlags.NonPublic);

    public T Load<T>(string id, int? version = null) where T : AggregateBase
    {
        IReadOnlyList<IEvent> events;
        using (var session = store.LightweightSession())
        {
            events = session.Events.FetchStream(id, version ?? 0);                
        }

        if (events != null && events.Any())
        {                                
            var instance = Activator.CreateInstance(typeof(T), true);                
            // Replay our aggregate state from the event stream
            events.Aggregate(instance, (o, @event) => ApplyEvent.Invoke(instance, new [] { @event.Data }));
            return (T)instance;
        }

        throw new InvalidOperationException($"No aggregate by id {id}.");
    }
}

With the last infrastructure component in place, versioned invoices can now be created, persisted and hydrated through Marten. For this purpose, first an invoice is created:


var invoice = new Invoice(42);

invoice.AddLine(100, 24, "Joo Janta 200 Super-Chromatic Peril Sensitive Sunglasses");
invoice.AddLine(200, 16, "Happy Vertical People Transporter");

Then, with an instantiated & configured Document Store (in this case with string as event stream identity) a repository is bootstrapped. The newly created invoice is then passed to the repository, which pushes the deltas to the database and clears them from the to-be-committed list of changes. Once persisted, the invoice data is replayed from the database and verified to match the data of the original item.


var repository = new AggregateRepository(theStore);

repository.Store(invoice);

var invoiceFromRepository = repository.Load<Invoice>(invoice.Id);

Assert.Equal(invoice.ToString(), invoiceFromRepository.ToString());           
Assert.Equal(invoice.Total, invoiceFromRepository.Total);

With this infrastructure in place and the ability to model change as events, it is also possible to replay back any previous state of the object. For example, it is possible to see what the invoice looked with only the first line added:


var invoiceFromRepository = repository.Load<Invoice>(invoice.Id, 2);

Assert.Equal(124, invoiceFromRepository.Total);

Lastly, to prevent our invoice from getting into a conflited state, the version attribute of the item is used to assert that the state of the object has not changed between replaying its state and introducing new deltas:


var invoice = CreateInvoice();
var invoiceWithSameIdentity = CreateInvoice();

repository.Store(invoice);

Assert.Throws<EventStreamUnexpectedMaxEventIdException>(() =>
{
    repository.Store(invoiceWithSameIdentity);
});