Sunday, July 01, 2012

Massaging the Messages - The Right Way

Message processing is an unavoidable piece of work in any integration project. When software systems talk to each other they use messages and generally the messaging interface is tightly managed. Message processing can have different priorities depending on the what the messages are trying to achieve and under what circumstances. In this instance I’m interested on event messaging in the context of system more concerned with reliability and less so with efficiency. Let’s just say it’s a non real time, critical system.  

In general, integration projects take time. They also have lot of complexity and interdependencies. Generally the natural way to build them is phase by phase. In each phase the messaging interface will grow in complexity and depending on the learnings from the previous phase it will change a fair bit as well.

That’s why a well designed message processing component should be robust - i.e respond to changes in the message interface (both semantic and syntactic) without heavy costs.

In addition a critical feature of these kind of message processing component is to be fail proof - i.e respond gracefully in case of failure and carry on the work. Sometimes it’s also required that the failures are notified and corrective procedures are facilitated.

In this post I’d like to share some simple patterns that can be followed to achieve above.

Syntactic Robustness


What I mean here is that the core of your message processing engine should not be worried about the actual format of the message being changed over time. You need to separate out the message parsing (deserializing) part from the message processing part.

In an Enterprise environment this could be achieved by delegating these aspects to a middleware. Middleware will manage the transformation of messages from one system to another. In the absence of such, you need to make sure that you have a separate mapping component which maps messages to your internal entities. Messages could be in the form of XML, DB records or SOAP objects or anything.  

Semantic Robustness with message processing

Here we are more concerned about the actual meaning of the message content.

Most of the time when integration projects emerge, it’s quite simple and straigtforward integration.  But with time they become quite complex and if the initial design is not resilient enough, the code base could end up a mess quite quickly.  

An example is a distributed workflow implementation system. The actual workflow might be happening in a different system (say a control system) and the downstream systems are required to perform different actions based on different workflow events. The first phase of the integration would be to support couple of critical workflow events but with time there will be more and more integration between the two.

That’s why I feel it’s a good idea to start with some form of software pattern which puts you in a good place once the changes start rolling in. A good candidate would be ‘Strategy Pattern’ or some derivative of it.

You can start with defining an interface which will capture the message processor responsibility and a data structure to store instances of these interfaces.


interface IMessageProcessor
{
void Process(Message message);
}

Hashtable<IMessageProcessor> Processors


In phase 1 you might have only 2 implementations like

class ProcessorA : IMessageProcessor
class ProcessorB : IMessageProcessor

Processors.Add(new ProcessorA())
Processors.Add(new ProcessorB())



In addition you need to have a manager class which diverts each message to its proper message processor. In order to do this it will have to do some preprocessing on the message.
It would be great if we could delegate the pre-processing to the interface itself. i.e Each processor is responsible of figuring out whether a message should be processed by it. We will need to enhance the interface as follows;


interface IMessageProcessor
{
void Process(Message message);
bool IsValid(Message message);
}



And the manager class might have a routine like;


foreach (IMessageProcessor p in Processors)
{
if (p.IsValid(message)) {
p.Process(message);

}
}

However in most instances the preprocessing has to happen within a context.
E.g A document edit message/event need to be processed in the context of the current state of the document

So we can enhance our interface like;

interface IMessageProcessor
{
void Process(Message message, Context context);
bool IsValid(Message message, Context context);
}


What we have here is a very simple, yet robust message processing engine with
1. Extensibility (Open-Close Principle)

- Extending support for new scenarios (class ProcessorC : IMessageProcessor)
2. Modifiability
- Localizing the changes to particular scenarios (If you want to send an SMS to the document owner as part of ProcessorB, you will only touch ProcessorB class)
3. Testability
- It’s fairly easy to write a test suite against the different Processors and maintain it.

Fail Proof

Here what I’m concerned about is what happens when you can’t process a message. There are endless reasons why your well intentioned message processing engine fails to process a message. Rather than dwell on why, let’s see how we should design it, so that life goes on even if a message (or two) fails.

One decision to be taken here is whether to hand over the responsibility of failure handling to the manager class or to each processor. If your failure handling mechanism end up being more complex and different (even in a few cases),  you will end up enhancing the interface as follows.


interface IMessageProcessor
{
bool Process(Message message, Context context);
bool IsValid(Message message, Context context);
void HandleFailure(Message message);
}

foreach (IMessageProcessor p in Processors)
{
var processed = false;
if (p.IsValid(message)) {
processed = p.Process(message);
}
if (!processed) {
p.HandleFailure(message);
}
}


If the failure handling is generic, there’s no need to change the interface and the manager class will look like; The requirement here is that every processor class won’t swallow errors internally.


foreach (IMessageProcessor p in Processors)
{
try {
if (p.IsValid(message)) {
p.Process(message);
}
}catch (Exception ex) {
//Do whatever
}
}


In either case, one specific requirement of failure handling is preventing subsequent messages from failing.

Suppose you fetch messages for processing as follows;

locator.FetchAll<Message>.Where(m => !m.Processed).OrderBy(m=> m.CreatedDate).First();


In a failure situation, if you don’t mark a message as processed, the problematic message could hi-jack your message processor. i.e Message processor will always try to process this message in each cycle. Thus we should have something like;


foreach (IMessageProcessor p in Processors)
{
try {
if (p.IsValid(message)) {
p.Process(message);
}
}catch (Exception ex) {
//Do whatever
}finally {
m.MarkAsProcessed();
}
}


Post a Comment