The previous post focused on MSMQ fundamentals using a pull approach to retrieve messages from a queue.
This post is the start of a series that covers multiple strategies to push queued messages to clients. The intention of the push approach is to keep clients agnostic of being part of a message based architecture.
MSMQ technology is used but it is easy enough to change the implementation to use an alternative queuing solution such as Azure Service Bus.
Download Source Code
Setup
Setting up an MSQM was covered in the MSMQ fundamentals post.
The following code was used for placing 3 unique OrderModel messages in the queue.
[Serializable]
public class OrderModel
{
public int Id { get; set; }
public string Name { get; set; }
}
using (var queue = new MessageQueue(@".\Private$\Orders"))
{
queue.Formatter = new BinaryMessageFormatter();
for (var orderId = 1; orderId <= 3; orderId++)
{
var order = new OrderModel()
{
Id = orderId,
Name = string.Format("Order {0}", orderId)
};
queue.Send(order);
}
}
Command Pattern
The problem is how can a message infrastructure issue requests to objects without knowing anything about the operation being requested or the receiver of the request?
The solution is to use the command pattern to decouple senders from receivers. A command decouples the object that invokes the operation from the object that knows how to perform the operation.
The generic command interface is displayed below. The OrderProcessorCommand class implements the command interface to indicate that it can accept OrderModel messages, which it will use to simulate an order being processed.
public interface ICommand<in T>
{
void Execute(T message);
}
public class OrderProcessorCommand : ICommand<OrderModel>
{
private readonly int _id;
public OrderProcessorCommand(int id)
{
_id = id;
}
public void Execute(OrderModel message)
{
if (message == null) throw new ArgumentNullException("message");
var start = DateTime.Now;
// Simulate work being performed
Thread.Sleep(TimeSpan.FromSeconds(2));
Console.WriteLine("Processed {0} by worker {1} from {2} to {3}",
message.Name, _id, start.ToString("h:mm:ss"),
DateTime.Now.ToString("h:mm:ss"));
}
}
Note that a sleep was added on line 21 to simulate work being performed by the processor.
Queue Monitor
The queue monitor class acts as orchestrator, which is responsible for listening to the queue for incoming messages and calling a command to execute each message.
When the client calls the start method then the workflow outlined below will run consciously until the client calls the stop method:
- The BeginReceive method will kick off the queue listing operation.
- The OnReceiveComplete event will be raised when a message arrives.
- The command will be executed by passing in the message content.
public interface IQueueMonitor : IDisposable
{
void Start();
void Stop();
}
public class QueueMonitor<T> : IQueueMonitor
{
private readonly MessageQueue _queue;
private readonly ICommand<T> _command;
private bool _continue = true;
public QueueMonitor(MessageQueue queue, ICommand<T> command)
{
if (queue == null) throw new ArgumentNullException("queue");
if (command == null) throw new ArgumentNullException("command");
_queue = queue;
_command = command;
_queue.ReceiveCompleted += OnReceiveCompleted;
}
private void OnReceiveCompleted(object sender,
ReceiveCompletedEventArgs receiveCompletedEventArgs)
{
var message = _queue.EndReceive(receiveCompletedEventArgs.AsyncResult);
_command.Execute((T)message.Body);
if (_continue) _queue.BeginReceive();
}
public void Start()
{
_continue = true;
_queue.BeginReceive();
}
public void Stop()
{
_continue = false;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool isDisposing)
{
if (!isDisposing || _queue == null) return;
_queue.ReceiveCompleted -= OnReceiveCompleted;
_queue.Dispose();
}
}
Note: The push method will not overwhelm the client with requests. The queue monitor will be blocked on line 27 and will not retrieve the next message in the queue until the command has finished processing the current request.
Single Receiver
Let's see the gears ticking over by processing the messages on the queue.
using (var queue = new MessageQueue(@".\Private$\Orders"))
{
var command = new OrderProcessorCommand(1);
var queueMonitor = new QueueMonitor<OrderModel>(queue, command);
queueMonitor.Start();
}
Processed Order 1 by worker 1 from 6:20:11 to 6:20:13
Processed Order 2 by worker 1 from 6:20:13 to 6:20:15
Processed Order 3 by worker 1 from 6:20:15 to 6:20:17
The output above shows that the single receiver processed the messages in order, one at a time.
The drawback of the single receiver is the finite amount of throughput due to the constraint of processing one message at time.
Summary
This post demonstrated a generic approach to continually monitor a queue for new messages and pushing the message content to a command to execute.
The next post will describe how to broadcast a single message across multiple processors.