Wiring up an event-message handling service


Recently, in a project I’m working on, I needed to create a service that would allow me to monitor what was going on in the application. In this case, log file info wasn’t rich enough for the type of analysis required. We wanted a contextualized view of what was going on at any moment within this application, but also to measure key aspects of its use. In a sense, it would be an implementation of some of the control patterns referred to in Architecting Enterprise Solutions: Patterns for High-capability Internet-based Systems (Wiley Software Patterns Series)
, such as the System Monitor.

Lately I’ve been looking at and quite interested in CQRS. I like the concept and what it offers, especially when used with event sourcing. Unfortunately, I haven’t been able to apply it for many reasons, namely lack of experience with it and time to focus on learning how to do it, especially considering the need for features in a time constrained production application. For this particular case, though, I considered I could use some of the concepts to get to a solution that’ll do what I want.

The Requirement

I currently have an application that handles a specific business domain. I want to capture information about what is going on within the application – what business processes are running, etc – and also capture usage statistics – processing time, number of calls, etc – in order to identify bottlenecks and allows allow managers to focus our team’s sprints on the most used aspects (especially if there are problems with them).

Also, I find it very important not to pollute my business logic code with monitoring concerns – I believe the business logic code should be clean and simple and as close to the domain as possible. Therefore, I consider that capturing this information should be somehow connected to an application-level service external do the domain code (possibly inserted through IOC-based interceptors), and the processing of this information should be in an external process.

The next diagram provides an overview of what I want:

CQRS includes a concept of event-handling mechanisms that can populate read models. Event processing services handle domain-emitted events, and use that event information to update a model designed  to match the presentation layer’s requirements. In my case, I’m considering capturing events (both domain-specific and application-specific depending on what I need) that my application publishes . These events will be represented as messages passed through a communication channel, with an endpoint that could be a web service (RPC style) or a subscriber on a message bus. The subscribing endpoint is a new context where the event messages will be consumed and processed to update the read model. The read model will be designed to optimize the reads by a GUI for dashboards and/or reporting. This data doesn’t need to be updated in real time, so the processing associated to the messages can be done asynchronously and throttled if necessary, although care may be needed in dealing with the order of the event-messages.

A high level view of the monitoring context is present in the next diagram.

The flow is quite simple, really. Event messages are published to the channel that our monitoring context is subscribed to. We’ll have a channel adapter on our endpoint to enable to correctly connect the service technology to the channel. The message is passed to a process manager that has a certain “saga” aspect to it, not so much for the state machine like nature, but because it controls the processing lifecycle or flow. The processor will request from the factory the handler objects that can handle whatever event type was passed in to the service. The factory will use a handler registry of some sort to figure out who can handle the request. The processor will then call each usable handler to handle the event. Each handler will do something to the information, updating the read model with some new state.

The handlers are, in this case, independent classes that can handle one or more event types. The handlers should be dedicated to some restricted aspect of the information we wan’t to capture, and only handle the event types that will be useful to produce that information. The more independent the handlers are of each other, the better, as we’ll avoid concurrency details, and parallelism will be obtainable. Also, event message object should be immutable, to avoid state change within the handlers.

A great thing about this is since you can have more than one handler handle a specific event type, each handler will only be concerned about some part of the event’s data and will do only a smaller, focused calculation. Algorithms and handling code should be simpler, even though there may be a large volume of these created in the context.

To cover the implementation, I’ve set up a sample solution, hosted at Github . It’s based on a spike I originally used to produce the solution. I recommend cloning and opening up the solution to follow along.

Domain Event Message Objects

First lets take a look at the domain event message objects. They are basically POCOS, all based on an abstract Event class, and grouped into a library that can be shared across applications / contexts, to simplify the code base. Note that it is not necessary to share this library – since we are passing information across bounded contexts (from the domain app to the monitoring app), our language between contexts can change. In that case, a separate library or message parser would be used to capture the data that is in a message. Nonetheless, because our sample event classes are very simple, we’ll just share the library in the example project. Sharing would probably be done though NuGet packaging.

Here’s the code for the abstract base Event class:

All it has is a Guid and a DateTime that describes and distinguishes each individual event. They are set through the constructor when the object instance is created. Because event POCOs must be serializable, I’ve added the DataContract attribute to the class. Also, since Event is abstract, we actually need to be able to (de)serialize derived types, so the KnownType attribute was also added. Typically, you add a reference to specific known types, but since we need them to be discovered at runtime, a method name is passed to KnownType, that is a private member of the Event class:

DerivedTypes() searches and returns all types that derive from Event. The GetDerivedTypes() is reusable and can be moved to a framework level class.

Next, we have the ProcessStarted class, a sample domain event, that describes a (domain) process that was initiated, somewhat like a long-running process (but could be any type of process really).

In this case, it adds a ProcessId guid and a ProcessTypeId guid to the event class. The ProcessId is an individual process identifier, and anything related to the process request will share this id. The ProcessTypeId is for categorizing the process types. Any other relevant properties could be added to the event POCO that describes the event. The source code also includes a ProcessEnded class that can represent an event to be emitted at the end of the process, with details about how long the process ran and if it ended with errors.

All in all, they are simple POCOS that can be serialized and transferred on the wire to the monitoring context. There, the data in them will be handled by event processors. Once again, serialization is a concern, and each derived type from Event will require the DataContract attribute.

Note: I believe that the DataContract / serialization attributes are the only infrastructure concerns attached to these classes. Whether that is acceptable or not is up for debate. If it were possible to remove these, or better yet, move them to an infrastructure / application level, then the application could decide what type of serialization mechanisms and attribute types to use. This is really only to point out that, lets say, if I wanted to use MessagContract attributes, or XmlElement attributes for the serialization, I would have to change the domain classes or at least their decorators. Seems to me to be a bit of a violation of the Open-Closed Principle...

A Look at the Event Processors

The event processors are classes that are marked to handle specific groups of events. Each processor handles whatever event types it needs to get its job done, and this is done by implementing the open-generic interface IHandleEvent.

The source code for this part is made up by three projects. There is a very small and generic “EventProcessing.Framework“, that defines the previously mentioned IHandleEvent<> interface:

The interface is very focused, and only demands the Handle() method. It also restricts handled types to Event derivatives. Our framework also contains a IProcessEvents marker interface and the Entity abstract base class. Entity here might not be a good name. I consider the inheriting classes to be a sort of aggregate roots withing my monitoring context (although my understanding of ARs might be hindered here by my lack of experience with correct DDD – please correct me if I am wrong). Entities, in this framework library, contain only an externally set id, passed in to its constructor. These three classes are grouped into their own library for reuse.

The other two projects associated to the monitoring  context are the DomainAEventProcessors library and the associated unit test library. This library has a DailyActivityEventProcessor that determines the number of active processes at the moment, and a ProcessTypeCounterEventProcessor that counts processes by type. Both can be used to update the read model used by a dashboard UI. Both retrieve data of an entity (or AR), act on it by calling commands, and persist it through an IRepository implementation.

Lets look specifically at the DailyActivityEventProcessor class. Because it is an event processor, it implements the marker interface IProcessEvents, and also handles ProcessStarted and ProcessEnded event types:

The IRepository implementation is injected at construction, and the implementation will typically be associated to some storage type (SqlServer, MongoDb, etc.). The Handle() methods have straightforward implementations, getting the current entity state from the repository, executing a command on it, and storing it again. There are obviously concurrency considerations to have, but I am not yet applying them here to simplify the example.

The DailyActivity class contains an internal dictionary to store process statuses keyed by id, and has a set of properties that determine, based on the dictionary’s data, the number of active processes. Because it is a daily record, the id for the entity is simply the date the process was executed, converted to string.

Notice that the class has two methods that allow for command-sytle calls – AddProcess() and EndProcess(). These essentially update the process list, which alters the entities properties.

 Dispatching Events to Handlers

Now that we have events and event processors, it’s time to wire everything up. Alot of what needs to be done isn’t so much a domain specific aspect of the monitoring context, but more of an infrastructure’s concern. Because of this, I’ve tried to do as much as possible with the IOC container (which in this case will be Castle Windsor). Also, to support the service, I’ll be using a WCF service, with the endpoint based on a HTTP binding for SOAP messages, but any other type of service could be used.

First thing I’ll add is the dispatcher, that will get handlers for a specific event type from the registry, and call Handle() on each of them with the event as an argument. I’ll add this to the framework since it is reusable. The dispatcher implements a very simple interface:

And has a quite a simple implementation:

Do take note of a couple of things:

  • The EventDispatcher implementation gets a IHandlerFactory injected. The factory can be implemented in many ways, basing it on a static map of types to handlers, or using IOC related functionality.
  • GetHandlersFor() returns an IEnumerable. The fact that we are using open generics (the IHandle<> interface), complicates things slightly, and requires us to use dynamics. Types will be resolved at runtime. This was one of the aspects where I got lost for the most part due to lack of use.
  • We are using a Parallel.ForEach to iterate through the handlers, because they are independent (or at least we implement them as such). This is an advantage of using simple, independent handler implementations.

The handler factory is also quite simple, and reinforces the dynamic nature of the handlers:

The dispatch mechanism is testable:

Here, two mock handlers where created for a fake event. The factory returns both mocks when called, and we verify that each handle method was called.

Using Castle to Resolve Handlers

I only added the IHandlerFactory interface to the framework, but no implementation for it. We can implement this factory in a couple of ways, but a really clean and interesting one is to use the IOC container. Castle Windsor has an AsFactory() method that proxies factory classes and registers them in the container. The only extra work is to create a selector definition when the default selection method is not useful.

I’ll start by adding the service application, which I mentioned is a WCF service (application) and that I’ll call MonitoringService. It’ll have a single service endpoint (MonitorService.svc) that implements the IMonitor ServiceContract:

Because Event is abstract and we need to deserialize it’s derived types, the ServiceKnownType attribute was added, and known types are to be discovered via a RegisterKnownTypes() method on the ServiceKnownTypeRegister class:

Like what we did for the Event class DataContract, we’ll scan the assemblies and discover any and all registered Event derivatives as deserialization candidates for the @event argument of the ReceiveEvent() service method. GetDerivedTypes() is repeated here, but could be moved to some utility or framework level library.

Back to wire-up: we need to do a couple of things to get Castle to control object instance creation in the WCF application. First, we need to create the container, in order to have a composition root, which we’ll do in a ContainerManager static class:

It needs to be instantiated in the global.asax:

The ContainerManager creates a new WindsorContainer and adds a couple of facilities we will need, namely the WcfFacility for WCF integration, and the TypedFactoryFacility to create factories. It is created once, at application start (right before we call the CallInstaller()s method), and then registers all the dependencies that need to be resolved by calling each available installer in the application (when the CallInstallers() method is calles). We create it in Application_Start() from the global.asax because we are using an HTTP version of the service. In other scenarios the HttpApplication is not available. Still, somewhere at application start, the container should be created and items registered for resolution.

By considering our implementation for IMonitor, we can pretty much get to every component we will need to register.

MonitorService‘s ReceiveEvent() implementation does only one thing – call the dispatcher’s DispatchEvent() to pass the event to the handlers. The class will therefore require an IEventDispatcher to be injected. MonitorService will no longer work with a default constructor, so we need to change the .svc so that Castle can handle service instance creation:

The ServiceHost directive now has the Service attribute that names the service, and a Factory attribute that determines that Windsor will create the service instances.

With this in mind, let’s consider the set of dependencies:

  • The container should resolve IMonitor as MonitorService.
  • MonitorService requires an IEventDispatcher, resolved to EventDispatcher.
  • EventDispatcher requires a IHandlerFactory, that will be based on Castle’s typed factories.
  • The handler factory will be creating IHandleEvent<> instances, so we will need to register all those that are available.
  • The handlers require IRepository<>, so we will need to register those available, and possibly consider conventions for selecting repository types.

I like to split the statements associated to Installers into different files, depending on their focus. In this case there are three. The first is the ServiceInstaller.cs that registers IMonitor, IEventDispatcher and IHandlerFactory:

Notes about this installer:

  • IMonitor is registered with a name. The name is used by HostFactory to resolve the service implementation and should match the name in the .svc file.
  • IEventDispatcher registration is straightforward
  • IHandlerFactory registration is two-step. We use the .AsFactory() extension method to create the factory. We also register an ITypedFactoryComponentSelector to be used by the factory to contain the logic required to select the types to be used.

Because we are using open generics, and because we need to do casting, we can’t rely on Castle’s default factory selectors to resolve the instances. HandleEventFactorySelector, a class we have defined, will be used as the selection mechanism for the factory:

The selectors are quite an interesting piece of code. It inherits from DefaultTypedFactoryComponentSelector, so that you only need to override whatever part of the default selector needs to be channged. For our factory, three methods need to be changed: GetComponentName(), GetComponentType(), and BuildFactoryComponent(). The factory usually calls the three methods (in that order, which can be debugged), and gets the name and type of the components we want to resolve. Finally, it gets the anonymous function that resolves the types based on the input. I’ve added a few overrides:

  • Since we want to resolve IHandleEvent<> based on the event type, we won’t be resolving components by name. Therefore we force GetComponentName() to return null, forcing the name component to be ignored.
  • We do want to resolve by type, based on the event type. Therefore, GetComponentType() creates a Type of IHandleEvent<>, and makes it a generic of the event type.
  • Since our factory will be returning multiple instances, instead of a single instance, BuildFactoryComponent() returns a delegate that calls the container’s ResolveAll(), for the type we determined, and casts each one to dynamic. The cast is important, and without it, we’ll get null instances or runtime errors.

This bit of logic – the selector and the AsFactory() – allows us to use our IOC container as the handler registry, and it will take care of creating the instances of the handlers we need to handle the received events.

We still need to register the handlers though. The associated logic is in HandlerInstaller.cs:

I based this installer code on a bit I found in SimpleInjector’s documentation. Basically, what it is doing is going through all the loaded assemblies and registering any IHandleEvent<> it finds, with some extra filtering and an added selection delegate. This code could be changed to load assemblies in a specific folder, as if they were plugins, and the app would not need to know about them at compile time.

Finally, because our handlers require repositories, we’ll install them.

Castle in this case can automatically resolve the generic type, since we have been explicit about that in the handler constructors. We’ll use a very simple InMemoryRepository implementation added at the application level. In pretty much all of the installers, we’ve omitted lifestyle definitions. Castle by default creates singletons, which for our current setup is ok.

At this point , everything is setup and we can run the app and accept events.

Testing the WCF Service

A great way to create an acceptance test (or at least something that tests the deserialization mechanism) is to throw SOAP messages at a running instance of the service (debug mode works well for this). I like to use SOAP UI for this type of verification.

The most important aspect to consider in our requests is the message format and extra elements to include to support deserialization automatically, using WCFs deserialization mechanisms. A sample request for a ProcessStarted event looks like this:

SOAP UI will generally create requests based on the WSDL, but won’t include specific event type properties into the sample request. I’ve highlighted lines 4 and 5 since they are the additional namespaces that were added (:mon for the domain events and :i for the type attribute), and at line 11, where the event properties are defined, the type attribute specifies that it is a ProcessStarted event object. That will aid the deserialization process.

Executing the request will generate response with a SOAP envelope and a body, though the response object will have no content because the method has a void return type. Do note that since we are using an in-memory repository, with a singleton lifestyle, multiple calls with the same event (or at least the same processId) will generate an exception. Still, with this request you can debug and /or view the call lifecycle.

Concluding Notes

This pretty much wraps it up. We’ve made it quite far, with a complete implementation. We have created an architecture that supports creating new event types, and handlers for those types, and automatically registering them in the application when the application starts up. Also, this architecture allows us to do multiple types of calculations and processing, in parallel, based on the same set of events.

In my opinion, the various aspects are correctly segregated, and separation of concerns is considered:

  • Event descriptions are in their own library and can be shared across projects;
  • Processors are separated and can be organized in different libraries based on the events they handle or on what they are focused on;
  • Base classes and interfaces are in framework level libraries and can be reused across projects;
  • Infrastructure-classes are defined and attached at the application level.

Resharper’s architecture graph shows the following graph that describes the previous list of notes:

Nonetheless, there are plenty of improvements that can be added. Some that occur to me are:

  • Out-of-order message handling (like getting a ProcessEnded event message before a ProcessStarted message for the same processId)
  • Handle repeated messages (what if a specific ProcessStarted message arrive twice?)
  • With calculations like those in DailyActivity, how do you handle processes that start in one day, and end on the next? (this is probably more a handler specific concern then the framework’s concern…)

 Suggestions for Improvements?

I wanted to post this article and solution for a couple of reasons. First, it was kind of tough to get this setup, especially due to the use of dynamics, open generics, and some of Castle’s features that I wasn’t used to using. Once it is put together, though, everything makes sense. It did help me get a better understanding of how Castle works, and also some good uses for C# features.

Still, this is a type of design that is kind of new to me, especially from a usage point of view (I’ve been looking in to it a lot lately, just this is the first time I’m actually using it). Therefore, any suggestions and improvements are highly appreciated and welcome.

The source code for what I have presented here is on GitHub (https://github.com/MiguelAlho/C&#8211;Event-Processors-Example) and can naturally be cloned for use.