CQRS and ES on Azure Table Storage

Lately I was playing with Event Sourcing and command query responsibility segregation (aka CQRS) pattern on Azure Table storage. Thought of creating a lightweight library that facilitates writing such applications. I ended up with a Nuget package to do this. here is the GitHub Repository.

A lightweight CQRS supporting library with Event Store based on Azure Table Storage.

Quick start guide

Install

Install the SuperNova.Storage Nuget package into the project.

Install-Package SuperNova.Storage -Version 1.0.0

The dependencies of the package are:

  • .NETCoreApp 2.0
  • Microsoft.Azure.DocumentDB.Core (>= 1.7.1)
  • Microsoft.Extensions.Logging.Debug (>= 2.0.0)
  • SuperNova.Shared (>= 1.0.0)
  • WindowsAzure.Storage (>= 8.5.0)

Implemention guide

Write Side – Event Sourcing

Once the package is installed, we can start sourcing events in an application. For example, let’s start with a canonical example of UserController in a Web API project.

We can use the dependency injection to make EventStore avilable in our controller.

Here’s an example where we register an instance of Event Store with DI framework in our Startup.cs

// Config object encapsulates the table storage connection string
services.AddSingleton(new EventStore( ... provide config ));

Now the controller:

[Produces("application/json")]
[Route("users")]
public class UsersController : Controller
{
public UsersController(IEventStore eventStore)
{
this.eventStore = eventStore; // Here capture the event store handle
}

... other methods skipped here
}

Aggregate

Implementing event sourcing becomes way much handier, when it’s fostered with Domain Driven Design (aka DDD). We are going to assume that we are familiar with DDD concepts (especially Aggregate Roots).

An aggregate is our consistency boundary (read as transactional boundary) in Event Sourcing. (Technically, Aggregate ID’s are our partition keys on Event Store table – therefore, we can only apply an atomic operation on a single aggregate root level.)

Let’s create an Aggregate for our User domain entity:

using SuperNova.Shared.Messaging.Events.Users;
using SuperNova.Shared.Supports;

public class UserAggregate : AggregateRoot
{
private string _userName;
private string _emailAddress;
private Guid _userId;
private bool _blocked;

Once we have the aggregate class written, we should come up with the events that are relevant to this aggregate. We can use Event storming to come up with the relevant events.

Here are the events that we will use for our example scenario:

public class UserAggregate : AggregateRoot
{

... skipped other codes

#region Apply events
private void Apply(UserRegistered e)
{
this._userId = e.AggregateId;
this._userName = e.UserName;
this._emailAddress = e.Email;
}

private void Apply(UserBlocked e)
{
this._blocked = true;
}

private void Apply(UserNameChanged e)
{
this._userName = e.NewName;
}
#endregion

... skipped other codes
}

Now that we have our business events defined, we will define our commands for the aggregate:

public class UserAggregate : AggregateRoot
{
#region Accept commands
public void RegisterNew(string userName, string emailAddress)
{
Ensure.ArgumentNotNullOrWhiteSpace(userName, nameof(userName));
Ensure.ArgumentNotNullOrWhiteSpace(emailAddress, nameof(emailAddress));

ApplyChange(new UserRegistered
{
AggregateId = Guid.NewGuid(),
Email = emailAddress,
UserName = userName
});
}

public void BlockUser(Guid userId)
{
ApplyChange(new UserBlocked
{
AggregateId = userId
});
}

public void RenameUser(Guid userId, string name)
{
Ensure.ArgumentNotNullOrWhiteSpace(name, nameof(name));

ApplyChange(new UserNameChanged
{
AggregateId = userId,
NewName = name
});
}
#endregion


... skipped other codes
}

So far so good!

Now we will modify the web api controller to send the correct command to the aggregate.

public class UserPayload 
{
public string UserName { get; set; }
public string Email { get; set; }
}

// POST: User
[HttpPost]
public async Task Post(Guid projectId, [FromBody]UserPayload user)
{
Ensure.ArgumentNotNull(user, nameof(user));

var userId = Guid.NewGuid();

await eventStore.ExecuteNewAsync(
Tenant, "user_event_stream", userId, async () => {

var aggregate = new UserAggregate();

aggregate.RegisterNew(user.UserName, user.Email);

return await Task.FromResult(aggregate);
});

return new JsonResult(new { id = userId });
}

And another API to modify existing users into the system:

//PUT: User
[HttpPut("{userId}")]
public async Task Put(Guid projectId, Guid userId, [FromBody]string name)
{
Ensure.ArgumentNotNullOrWhiteSpace(name, nameof(name));

await eventStore.ExecuteEditAsync(
Tenant, "user_event_stream", userId,
async (aggregate) =>
{
aggregate.RenameUser(userId, name);

await Task.CompletedTask;
}).ConfigureAwait(false);

return new JsonResult(new { id = userId });
}

That’s it! We have our WRITE side completed. The event store is now contains the events for user event stream.

EventStore

Read Side – Materialized Views

We can consume the events in a seperate console worker process and generate the materialized views for READ side.

The readers (the console application – Azure Web Worker for instance) are like feed processor and have their own lease collection that makes them fault tolerant and resilient. If crashes, it catches up form the last event version that was materialized successfully. It’s doing a polling – instead of a message broker (Service Bus for instance) on purpose, to speed up and avoid latencies during event propagation. Scalabilities are ensured by means of dedicating lease per tenants and event streams – which provides pretty high scalability.

How to listen for events?

In a worker application (typically a console application) we will listen for events:

private static async Task Run()
{
var eventConsumer = new EventStreamConsumer(
... skipped for simplicity
"user-event-stream",
"user-event-stream-lease");

await eventConsumer.RunAndBlock((evts) =>
{
foreach (var @evt in evts)
{
if (evt is UserRegistered userAddedEvent)
{
readModel.AddUserAsync(new UserDto
{
UserId = userAddedEvent.AggregateId,
Name = userAddedEvent.UserName,
Email = userAddedEvent.Email
}, evt.Version);
}

else if (evt is UserNameChanged userChangedEvent)
{
readModel.UpdateUserAsync(new UserDto
{
UserId = userChangedEvent.AggregateId,
Name = userChangedEvent.NewName
}, evt.Version);
}
}

}, CancellationToken.None);
}

static void Main(string[] args)
{
Run().Wait();
}

Now we have a document collection (we are using Cosmos Document DB in this example for materialization but it could be any database essentially) that is being updated as we store events in event stream.

Conclusion

The library is very light weight and havily influenced by Greg’s event store model and aggreagate model. Feel free to use/contribute.

Thank you!

ASP.net 4.5 applications on Docker Container

Docker makes application deployment easier than ever before. However, most of the Docker articles are often written how to containerized application that runs on Linux boxes. But since Microsoft now released windows containers, the legacy (yes, we can consider .net 4.5 as legacy apps) .net web apps are not left out anymore. I was playing with an ASP.net 4.5 web application recently, trying to make a container out of it. I have found the possibility exciting, not to mentioned that I have enjoyed the process entirely. There are few blogs that I have found very useful, especially this article on FluentBytes. I have however, developed my own scripts for my own application, which is indifferent than the one in FluentBytes (thanks to the author), but I have combined few steps into the dockerfile – that helped me get going with my own containers.
If you are reading this and trying to build container for your asp.net 4.5 applications, here are the steps: In order to explain the process, we’ll assume our application is called LegacyApp. A typical asp.net 4.5 application.

  • We will install Docker host on windows.
  • Create a directory (i.e. C:\package) that will contain all the files needed to build the container.
  • Create the web deploy package of the project from Visual studio. The following images hints the steps we need to follow.

Image: Create a new publish profile

Image: Use ‘Web Deploy Package’ option

Important note: We should name the site in the following format Default Web Site\LegacyApp to avoid more configuration works.

  • Download the WebDeploy_2_10_amd64_en-US.msi from Microsoft web site
  • I have ran into an ACL issue while deploying the application. Thanks to the author of FluentBytesarticle, that provides one way to solve the issue by using a powershell file during the installation. We will create that file into the same directory, let’s name it as fixAcls.ps1. The content of the file can be found here
  • We will create the dockerfile into the same directory, with the content of this sample dockerfile

At this moment our package directory will look somewhat following:

  • Now we will go to the command prompt and navigate the command prompt to the directory (i.e. C:\package) we worked so far.
  • Build the container
     c:/> docker build -t legacyappcontainer .
  • Run the container
    c:/> docker run -p 80:80 legacyappcontainer 

Now that the container is running, we can start browsing your application. we can’t use the localhost or 127.0.0.0 on our host machine to browse the application (unlike Linux containers), we need to use the machine name or the IP address in our URL. Here’s what we can do:

  • We will run the inspect command to see the container IP.
     C:\> docker -inspect  
  • Now we will take the IP from the JSON and use the IP on our URL to navigate to the application.

The complete dockerfile can be found into the Github Repo.

RabbitMQ High-availability clusters on Azure VM

Background

Recently I had to look into a reliable AMQP solution (publish-subscribe queue model) in order to build a message broker for a large application. I started with the Azure service bus and RabbitMQ. It didn’t took long to understand that RabbitMQ is much more attractive over service bus because of their efficiency and cost comparisons when there are large number of messages. See the image taken from Mariusz Wojcik’s blog.

Setting up RabbitMQ on a windows machine is relatively easy. RabbitMQ web site nicely documented how to do that. However, when it comes to install RabbitMQ cluster on some cloud VMs, I found Linux (Ubuntu) VMs are handier for their faster booting. For quite a long time I haven’t used the *nix OS, so found the journey really interested to write a post about it.

Spin up VMs on Azure

We need two Linux VMs, both will have RabbitMQ installed as server and they will be clustered. The high level picture of the design looks like following:

Login to the Azure portal and create two VM instances based on the Ubuntu Server 14.04 LTS images on Azure VM depot.

I have named them as MUbuntu1 and MUbuntu2. The VMs need to be in the same cloud service and the same availability set, to achieve redundancy and high availability. The availability set ensures that Azure Fabric Controller will recognize this scenario and will not take all the VMs down together when it does maintenance tasks, i.e. OS patch/updates for example.

Once the VM instances are up and running, we need to define some endpoints for RabbitMQ. Also they need to be load balanced. We go to the MUbuntu1 details in management portal and add two endpoints-port 15672 and port 5672 one for RabbitMQ connection from client applications another for RabbitMQ management portal application. Scott Hanselman has described the details how to create load balanced VMs. Once we create them it will look like following:

Now we can SSH into both of these machines, (Azure already mapped the SSH port 22 to a port which can be found on the right side of the dashboard page for the VM).

Install RabbitMQ

Once we SSH into the terminals of both of the machines we can install RabbitMQ by executing the following commands:



sudo add-apt-repository 'deb http://www.rabbitmq.com/debian/ testing main'
sudo apt-get update
sudo apt-get -q -y --force-yes install rabbitmq-server

The above apt-get will install the Erlang and RabbitMQ server on both machines. Erlang nodes use a cookie to determine whether they are allowed to communicate with each other – for two nodes to be able to communicate they must have the same cookie. Erlang will automatically create a random cookie file when the RabbitMQ server starts up. The easiest way to proceed is to allow one node to create the file, and then copy it to all the other nodes in the cluster. On our VMs the cookie will be typically located in /var/lib/rabbitmq/.erlang.cookie

We are going to create the cookie in both machines by executing the following commands



echo 'ERLANGCOOKIEVALUE' | sudo tee /var/lib/rabbitmq/.erlang.cookie
sudo chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
sudo chmod 400 /var/lib/rabbitmq/.erlang.cookie
sudo invoke-rc.d rabbitmq-server start

Install Management portal for RabbitMQ

Now we can also install the RabbitMQ management portal so we can monitor the Queue from a browser. Following commands will install the management plugin:



sudo rabbitmq-plugins enable rabbitmq_management
sudo invoke-rc.d rabbitmq-server stop
sudo invoke-rc.d rabbitmq-server start

So far so good. Now we create a user that we want to use to connect the queue from the clients and monitoring. You can manage users anytime later too.



sudo rabbitmqctl add_user
sudo rabbitmqctl set_user_tags administrator
sudo rabbitmqctl set_permissions -p / '.*' '.*' '.*'

Configuring the cluster

So far we have two RabbitMQ server up and running, it’s time to connect them as cluster. To do so, we need to go to one of the machines and join the cluster. The following command will do that:


sudo rabbitmqctl stop_app
sudo rabbitmqctl join_cluster rabbit@MUbuntu1
sudo rabbitmqctl start_app
sudo rabbitmqctl set_cluster_name RabbitCluster

We can verify if the cluster is configured properly via RabbitMQ management portal:

Or from SSH terminal:

Queues within a RabbitMQ cluster are located on a single node by default. They need to be made mirrored across multiple nodes. Each mirrored queue consists of one master and one or more slaves, with the oldest slave being promoted to the new master if the old master disappears for any reason. Messages published to the queue are replicated to all slaves. Consumers are connected to the master regardless of which node they connect to, with slaves dropping messages that have been acknowledged at the master. Queue mirroring therefore enhances availability, but does not distribute load across nodes (all participating nodes each do all the work). This solution requires a RabbitMQ cluster, which means that it will not cope seamlessly with network partitions within the cluster and, for that reason, is not recommended for use across a WAN (though of course, clients can still connect from as near and as far as needed). Queues have mirroring enabled via policy. Policies can change at any time; it is valid to create a non-mirrored queue, and then make it mirrored at some later point (and vice versa). More on this are documented in RabbitMQ site. For this example, we will replicate all queues by executing this on SSH:


rabbitmqctl set_policy ha-all "" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

That should be it. The cluster is now up and running, we can create a quick .NET console application to test this. I have created 2 console applications and a library that has one class as the message contract. VS Solution looks like this:

We will use EasyNetQ to connect to the RabbitMQ, which we can nuget in publisher and subscriber project.

In the contract project (class library), we have following classes in a single code file


namespace Contracts
{
public class RabbitClusterAzure
{
public const string ConnectionString =
@"host=;username=;password=";
}


public class Message
{
public string Body { get; set; }
}
}

The publisher project has the following code in program.cs


namespace Publisher
{
class Program
{
static void Main(string[] args)
{
using (var bus = RabbitHutch.CreateBus(RabbitClusterAzure.ConnectionString))
{
var input = "";
Console.WriteLine("Enter a message. 'Quit' to quit.");
while ((input = Console.ReadLine()) != "Quit")
{
Publish(bus, input);
}
}
}

private static void Publish(IBus bus, string input)
{
bus.Publish(new Contracts.Message
{
Body = input
});
}
}
}

Finally, the subscriber project has the following code in the program.cs


namespace Subscriber
{
class Program
{
static void Main(string[] args)
{
using (var bus = RabbitHutch.CreateBus(RabbitClusterAzure.ConnectionString))
{
var retValue = bus.Subscribe("Sample_Topic", HandleTextMessage);

Console.WriteLine("Listening for messages. Hit to quit.");
Console.ReadLine();
}
}

static void HandleTextMessage(Contracts.Message textMessage)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("Got message: {0}", textMessage.Body);
Console.ResetColor();
}
}
}

Now we can run the Publisher and multiple instance of subscriber and it will dispatch messages in round-robin (direct exchange). We can also take one of the VM down and it will not lose any messages.

We can also see the traffics to the VMs (cluster instance too) directly from Azure portal.

Conclusion

I have to admit, I found it extremely easy and convenient to configure up and run RabbitMQ clusters. The steps are simple and setting it up just works.

Quick and easy self-hosted WCF services

I realized that I am not writing blogs for a long time. I feel bad about that, this post is an attempt to get out of the laziness.

I often find myself writing console applications that have a simple WCF service and a client that invokes that to check different stuffs. Most of the time, I want to have a quick service that is hosted using either NetTcpBinding or WsHttpBinding with very basic configurations. Which triggers the urge writing a bootstrap mechanism to easily write and host WCF services and consume them at ease. I am planning to extend the implementation into a more richer one gradually, but I have something already to do a decent kick off. Here’s how it works.

Step 1 : Creating the contract

You need to create a class library where you can have your contract interfaces for the service you are planning to write. Something like following



[ServiceContract(Namespace = "http://abc.com/enterpriseservices")]
public interface IWcf
{
[OperationContract]
string Greet(string name);
}

Now you need to copy the WcfService.cs file into the same project. This file contain one big class named WcfService. That has the public methods to host services and also creating client proxies to invoke them. The class can be downloaded from this Git (https://github.com/MoimHossain/WcfServer) repository. Once you have it added into your project, go to step 2.

Step 2 : Creating Console Server project.

Create a console application that will host the service. Add a reference to the project created in step 1. Define your service implementation class as follows




// Sample service
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
public class MyService : IWcf
{
public string Greet(string name)
{
return DateTime.Now.ToString() + name;
}
}

Finally modify the program.cs to have something like following



class Program
{
static void Main(string[] args)
{
try
{
// use WcfService.Tcp for NetTcp binding or WcfService.Http for WSHttpBinding

var hosts = WcfService.DefaultFactory.CreateServers(
new List { typeof(MyService) },
(t) => { return t.Name; },
(t) => { return typeof(IWcf); },
"WcfServices",
8789,
(sender, exception) => { Trace.Write(exception); },
(msg) => { Trace.Write(msg); },
(msg) => { Trace.Write(msg); },
(msg) => { Trace.Write(msg); });

Console.WriteLine("Server started....");
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
Console.ReadKey();
}
}


At this point you should be able to hit F5 and run the server console program.

Step 3 : Creating Console client project

Create another console application and modify the program.cs to something like following




class Program
{
static void Main(string[] args)
{
try
{
// use WcfService.Tcp for NetTcp binding or WcfService.Http for WSHttpBinding

using (var wcf =
WcfService.DefaultFactory.CreateChannel(Environment.MachineName, 8789, (t) => { return "MyService"; }, "WcfServices"))
{
var result = wcf.Client.Greet("Moim");

Console.WriteLine(result);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}

You are good to go! Hope this helps somebody (at least myself).