JasperFx 0.8.2


Next

Handling an Unknown Message

Previous

Error Handling

Cascading Messages Edit on GitHub


Many times during the processing of a message you will need to create and send out other messages. Maybe you need to respond back to the original sender with a reply, maybe you need to trigger a subsequent action, or send out additional messages to start some kind of background processing. You can do that by just having your handler class use the IServiceBus interface as shown in this sample:


public class NoCascadingHandler
{
    private readonly IMessageContext _bus;

    public NoCascadingHandler(IMessageContext bus)
    {
        _bus = bus;
    }

    public void Consume(MyMessage message)
    {
        // do whatever work you need to for MyMessage,
        // then send out a new MyResponse
        _bus.Send(new MyResponse());
    }
}

The code above certainly works and this is consistent with most of the competing service bus tools. However, Jasper supports the concept of cascading messages that allow you to automatically send out objects returned from your handler methods without having to use IServiceBus as shown below:


public class CascadingHandler
{
    public MyResponse Consume(MyMessage message)
    {
        return new MyResponse();
    }
}

When Jasper executes CascadingHandler.Consume(MyMessage), it "knows" that the MyResponse return value should be sent through the service bus as part of the same transaction with whatever routing rules apply to MyResponse. A couple things to note here:

  • Cascading messages returned from handler methods will not be sent out until after the original message succeeds and is part of the underlying transport transaction
  • Null's returned by handler methods are simply ignored
  • There is a significant performance advantage to using cascading messages instead of explicitly calling IServiceBus.Send() if you are using the LightningQueues transport
  • The cascading message feature was explicitly designed to make unit testing handler actions easier by shifting the test strategy to state-based where you mostly need to verify the state of the response objects instead of mock-heavy testing against calls to IServiceBus.

Request/Reply Scenarios

Normally, cascading messages are just sent out according to the configured subscription rules for that message type, but there's an exception case. If the original sender requested a response, Jasper will automatically send the cascading messages returned from the action to the original sender if the cascading message type matches the reply that the sender had requested. If you're examining the Envelope objects for the message, you'll see that the "reply-requested" header is "MyResponse."

Let's say that we have two running service bus nodes named "Sender" and "Receiver." If this code below is called from the "Sender" node:


public class Requester
{
    private readonly IMessageContext _bus;

    public Requester(IMessageContext bus)
    {
        _bus = bus;
    }

    public Task<MyResponse> GatherResponse()
    {
        return _bus.Request<MyResponse>(new MyMessage());
    }
}

and inside Receiver we have this code:


public class CascadingHandler
{
    public MyResponse Consume(MyMessage message)
    {
        return new MyResponse();
    }
}

Assuming that MyMessage is configured to be sent to "Receiver," the following steps take place:

  1. Sender sends a MyMessage message to the Receiver node with the "reply-requested" header value of "MyResponse"
  2. Receiver handles the MyMessage message by calling the CascadingHandler.Consume(MyMessage) method
  3. Receiver sees the value of the "reply-requested" header matches the response, so it sends the MyResponse object back to Sender
  4. When Sender receives the matching MyResponse message that corresponds to the original MyMessage, it sets the completion back to the Task returned by the IServiceBus.Request<TResponse>() method

Conditional Responses

You may need some conditional logic within your handler to know what the cascading message is going to be. If you need to return different types of cascading messages based on some kind of logic, you can still do that by making your handler method return signature be object like this sample shown below:


public class ConditionalResponseHandler
{
    public object Consume(DirectionRequest request)
    {
        switch (request.Direction)
        {
            case "North":
                return new GoNorth();
            case "South":
                return new GoSouth();
        }

        // This does nothing
        return null;
    }
}

Delayed Messages

You may want to raise a delayed message by using the DelayedResponse class as shown below:


public class ScheduledResponseHandler
{
    public ScheduledResponse Consume(DirectionRequest request)
    {
        // Process GoWest in 5 minutes from now
        return new ScheduledResponse(new GoWest(), TimeSpan.FromMinutes(5));
    }

    public ScheduledResponse Consume(MyMessage message)
    {
        // Process GoEast at 8 PM local time
        return new ScheduledResponse(new GoEast(), DateTime.Today.AddHours(20));
    }
}

Multiple Cascading Messages

You can also raise any number of cascading messages by returning either any type that can be cast to IEnumerable<object>, and Jasper will treat each element as a separate cascading message. An empty enumerable is just ignored.


public class MultipleResponseHandler
{
    public IEnumerable<object> Consume(MyMessage message)
    {
        // Go North now
        yield return new GoNorth();

        // Go West in an hour
        yield return new ScheduledResponse(new GoWest(), TimeSpan.FromHours(1));
    }
}

Send Message Back to the Sender

If you want to send a message right back to the original sender node, you can return your message wrapped by the RespondToSender type:


public class BackToSenderHandler
{
    public RespondToSender Consume(MyMessage message)
    {
        return new RespondToSender(new GoWest());
    }
}

Send Message to a Specific Channel

If you need to trigger a message that needs to be sent to a specific node and want to bypass the normal subscription routing, you can use the SendDirectlyTo wrapper like this:


public class GoDirectlyHandler
{
    public static readonly Uri ChannelAddress
        = "lq.tcp://localhost:2200/service".ToUri();

    public SendDirectlyTo Consume(MyMessage message)
    {
        // Send the GoWest message to the running node
        // at a given Uri
        return new SendDirectlyTo(ChannelAddress, new GoWest());
    }
}

Fluent Interface for Controlling Responses

Jasper also supports a limited fluent interface to more exactly control how the cascading message should be handled:


public class RespondsHandler
{
    public Response Consume(MyMessage message)
    {
        return Respond
            // The actual message being send back out
            .With(new GoEast())

            // Delay the processing by a timespan
            .DelayedBy(TimeSpan.FromDays(1))

            // Delay the processing until a given time
            .DelayedUntil(DateTime.Today.AddDays(3))

            // Send this message directly to the originator
            // of the original MyMessage
            .ToSender()

            // Finally, directly modify the Envelope for
            // the outgoing message for rarely used options
            .Altered(envelope =>
            {
                // Do any alterations you'd want to the outgoing message

                // Override the serialization maybe?
                envelope.ContentType = "application/json";

                envelope.Destination
                    = new Uri("lq.tcp://localhost:2201/system");
            });
    }
}

Custom Cascading Message Behavior

Finally, you can create your own custom response behavior by creating your own implementation of ISendMyself and returning that wrapper object from your handler methods:


public class SpecialGoWest : ISendMyself
{
    public Envelope CreateEnvelope(Envelope original)
    {
        return new Envelope
        {
            Message = new GoWest(),
            Destination = original.ReplyUri,
            ContentType = "text/xml"
        };
    }
}

DelayedResponse, RespondToSender, and SendDirectlyTo are implementations of ISendMyself.

Using C# Tuples as Return Values

Sometimes you may well need to return multiple cascading messages from your original message action. In FubuMVC, Jasper's forebear, you had to return either object[] or IEnumerable<object> as the return type of your action -- which had the unfortunate side effect of partially obfuscating your code by making it less clear what message types were being cascaded from your handler without carefully reading the message body. In Jasper, we still support the "mystery meat" object return value signatures, but now you can also use C# tuples to better denote the cascading message types.

This handler cascading a pair of messages:


public class MultipleResponseHandler
{
    public IEnumerable<object> Consume(MyMessage message)
    {
        // Go North now
        yield return new GoNorth();

        // Go West in an hour
        yield return new ScheduledResponse(new GoWest(), TimeSpan.FromHours(1));
    }
}

can be rewritten with C# 7 tuples to:


public class TupleResponseHandler
{
    public (GoNorth, ScheduledResponse) Consume(MyMessage message)
    {
        return (new GoNorth(), new ScheduledResponse(new GoWest(), TimeSpan.FromHours(1)));
    }
}

The sample above still treats both GoNorth and the ScheduledResponse as cascading messages. The Jasper team thinks that the tuple-ized signature makes the code more self-documenting and easier to unit test.