Software Design Blog - Queues Simple solutions to solve complex problems / http://www.rssboard.org/rss-specification BlogEngine.NET 3.1.1.0 en-US /opml.axd http://www.dotnetblogengine.net/syndication.axd Jay Strydom Software Design Blog 0.000000 0.000000 Push queue processing boundaries with 3x greater throughput <img class="img-responsive" alt="Broadcasting Messages" src="/pics/banners/ProcessingQueuesFasterBlog.jpg"> <br> <p><b>The problem</b> is how can we measure the queue processing throughput? Or in a production environment, how can we add a performance counter without modifying our existing infrastructure?</p> <p><b>The solution</b> is to add a decorator pattern that will act as a proxy between the QueueMonitor class and the actual command processing class to intercept the communication and add instrumentation.</p> <p>The <a href="/post/dispatching-messages-with-the-competing-consumers-pattern">previous post</a> used the competing consumer pattern to process a queue concurrently. The <a href="/post/message-queue-delivery-strategies">message queue strategies</a> post used a single processor to process a queue.</p> <p>In this post, we will add instrumentation to the single queue processor and the competing consumer queue processor to compare performance.</p> <a class="btn btn-primary btn-sm" role="button" href="/Downloads/MessageQueueExamplePerformance.zip">Download Source Code</a> <h3>Setup</h3> <p>The experiment will add 512KB messages to a queue to simulate a decent workload. The intention is to read the messages from the queue and call a File SMTP mailer server that will write the emails to disk as fast as possible.</p> <p>The core components such as the Queue Monitor were covered in the <a href="/post/message-queue-delivery-strategies">Message Queue Delivery Strategies</a> post. The queue monitor is responsible for reading messages from the queue and pushing the message content to a command to execute.</p> <p>The classes that will be used in this experiment are listed below.</p> <pre class="brush: c-sharp;"> public class FileSmtpMailer : IMailer { private readonly string _path; public FileSmtpMailer(string path) { if (path == null) throw new ArgumentNullException("path"); _path = path; } public void Send(string message, string sender, string recipients, string subject) { using (var client = new SmtpClient()) { // This can be configured in the app.config client.DeliveryMethod = SmtpDeliveryMethod.SpecifiedPickupDirectory; client.PickupDirectoryLocation = _path; using (var mailMessage = new MailMessage(sender, recipients, subject, message)) { mailMessage.IsBodyHtml = true; client.Send(mailMessage); } } } } public class MailProcessorCommand : ICommand&lt;OrderMessage&gt; { private readonly IMailer _mailer; public MailProcessorCommand(IMailer mailer) { if (mailer == null) throw new ArgumentNullException("mailer"); _mailer = mailer; } public void Execute(OrderMessage message) { if (message == null) throw new ArgumentNullException("message"); _mailer.Send(message.Body, message.Sender, message.Recipients, message.Subject); } } </pre> <h3>Instrumentation</h3> <p>The TimerCommandDecorator decorator will be placed in between the QueueMonitor and the MailProcessorComamnd in order to measure the performance throughput. </p> <pre class="brush: c-sharp;"> public class TimerCommandDecorator&lt;T&gt; : ICommand&lt;T&gt; { private readonly ICommand&lt;T&gt; _next; private int _messages; private Stopwatch _stopwatch; public TimerCommandDecorator(ICommand&lt;T&gt; next) { if (next == null) throw new ArgumentNullException("next"); _next = next; } public void Execute(T message) { if (_stopwatch == null) _stopwatch = Stopwatch.StartNew(); _next.Execute(message); _messages += 1; Console.WriteLine("Processing {0} messages took {1} sec", _messages, _stopwatch.Elapsed.TotalSeconds); } } </pre> <p>The TimerCommandDecorator implementation can easily be swapped out with a perfmon version that will increment the performance counter for each message.</p> <h3>Single vs Competing Consumers</h3> <p>Let's run the experiment by observing the throughput difference between the single processor and competing consumer receivers by processing 1000 messages.</p> <p>The single processor version:</p> <pre class="brush: c-sharp;"> using (var queue = new MessageQueue(QueuePath)) { queue.Formatter = new BinaryMessageFormatter(); var message = new OrderMessage() { Recipients = "recipient@mail.com", Sender = "admin@mail.com", Subject = "Hello World", Body = emailBody // loaded from a file }; // Send messages to the queue for (var i = 0; i &lt; 1000; i++) { queue.Send(message); } var fileMailer = new FileSmtpMailer(@"C:\temp"); var command = new MailProcessorCommand(fileMailer); var timerCommand = new TimerCommandDecorator&lt;OrderMessage&gt;(command); var queueMonitor = new QueueMonitor&lt;OrderMessage&gt;(queue, timerCommand); queueMonitor.Start(); } </pre> <pre>Processing 1000 messages took 47.3004132 sec </pre> <p>The competing consumer processor version with 10 processors:</p> <pre class="brush: c-sharp;"> // Create 10 processors var commands = new List&lt;ICommand&lt;OrderMessage&gt;&gt;(); for (var i = 0; i &lt; 10; i++) { commands.Add(new MailProcessorCommand(fileMailer)); } var command = new CompositeCompetingCommandProcessor&lt;OrderMessage&gt;(commands); var timerCommand = new TimerCommandDecorator&lt;OrderMessage&gt;(command); var queueMonitor = new QueueMonitor&lt;OrderMessage&gt;(queue, timerCommand); queueMonitor.Start(); </pre> <pre>Processing 1000 messages took 16.241723 sec </pre> <div class="alert alert-success" role="alert"> <b>Success!</b> The competing consumer pattern processed the mail queue 3 times faster. </div> <p>We could experiment using various numbers of concurrent receivers to determine how performance will be affected.</p> <div class="alert alert-warning" role="alert"> <b>Note:</b> The disk was saturated with request and was running at 100% capacity. </div> <p>The competing consumer pattern is a great option to fully utilize resources but it may have a negative impact on other tenants of the system. For example, if we call an external API with too many requests, it may appear like a denial-of-service attack.</p> <h3>Summary</h3> <p>The advantage of the competing consumers pattern is the flexibility to configure the number of concurrent consumers to produce a balanced outcome. It is however difficult to determine the throughput and performance impact without instrumentation in place to measure resource utilization.</p> <p>An approach was covered to add performance instrumentation. The performance metrics revealed that the queue processing throughput can be improved significantly using the competing consumer queue processor compared to the single queue processor.</p> /post/push-queue-processing-boundaries-with-3x-greater-throughput jay@webdevelopment.co.nz /post/push-queue-processing-boundaries-with-3x-greater-throughput#comment /post.aspx?id=53660465-0890-4eaf-bae1-d3bb61b8a9a6 Sat, 26 Dec 2015 10:54:00 +1300 Decorator Pattern Queues .NET C# Decorator Design Pattern Jay Strydom /pingback.axd /post.aspx?id=53660465-0890-4eaf-bae1-d3bb61b8a9a6 0 /trackback.axd?id=53660465-0890-4eaf-bae1-d3bb61b8a9a6 /post/push-queue-processing-boundaries-with-3x-greater-throughput#comment /syndication.axd?post=53660465-0890-4eaf-bae1-d3bb61b8a9a6 Dispatching messages with the Competing Consumers Pattern <img class="img-responsive" alt="Broadcasting Messages" src="/pics/banners/CompetingConsumersBlog.jpg"> <br> <p>The <a href="/post/broadcasting-messages-using-the-composite-pattern">previous post</a> focused on fanning out the same queued message to multiple consumers.</p> <p><b>The problem</b> is that a single worker is great at processing a single message at a time but how do we increase the throughput by processing messages faster?</p> <p><b>The solution</b> is to use the Competing Consumers Pattern, which enables multiple concurrent workers to process messages received in the same queue. This approach will optimise throughput and balance the workload.</p> <p>For example, travellers often queue at the airport check-in. Checking in would be a lengthy process if there was a long queue but only one check-in operator. The solution is to employee multiple operators to service the same queue to process it faster.</p> <a class="btn btn-primary btn-sm" role="button" href="/Downloads/MessageQueueExampleCompeting.zip">Download Source Code</a> <h3>Setup</h3> <p>The core components such as the Queue Monitor were covered in the <a href="/post/message-queue-delivery-strategies">Message Queue Delivery Strategies</a> post. The queue monitor is responsible for reading messages from the queue and pushing the message content to a command to execute.</p> <p>The OrderProcessorCommand class used in the previous post was modified on line 19 below to simulate each worker taking a different duration to process a request. For example, worker 1 will take 1 second to process a request whereas worker 2 will take 2 seconds to process a request.</p> <pre class="brush: c-sharp;"> public interface ICommand&lt;in T&gt; { void Execute(T message); } public class OrderProcessorCommand : ICommand&lt;OrderModel&gt; { private readonly int _id; public OrderProcessorCommand(int id) { _id = id; } public void Execute(OrderModel message) { if (message == null) throw new ArgumentNullException("message"); var start = DateTime.Now; Thread.Sleep(TimeSpan.FromSeconds(_id)); Console.WriteLine("Processed {0} by worker {1} from {2} to {3}", message.Name, _id, start.ToString("h:mm:ss"), DateTime.Now.ToString("h:mm:ss")); } } </pre> <h3>Competing Consumers</h3> <p>The pool of available commands are orchestrated using the CompositeCompetingCommandProcessor class.</p> <p>The class will automatically block the queue monitor on line 33 from retrieving and pushing in the next message when all of the command processors are busy. The queue monitor will be unblocked as soon as the first command processor becomes available.</p> <p>The WorkingCommandModel is used to associate a command with a task.</p> <pre class="brush: c-sharp;"> public class WorkingCommandModel&lt;T&gt; { public ICommand&lt;T&gt; Command { get; set; } public Task Task { get; set; } } public class CompositeCompetingCommandProcessor&lt;T&gt; : ICommand&lt;T&gt; { private readonly IEnumerable&lt;WorkingCommandModel&lt;T&gt;&gt; _workers; public CompositeCompetingCommandProcessor(IEnumerable&lt;ICommand&lt;T&gt;&gt; commands) { if (commands == null) throw new ArgumentNullException("commands"); _workers = commands.Select( c =&gt; new WorkingCommandModel&lt;T&gt; { Command = c }).ToList(); } public void Execute(T message) { WorkingCommandModel&lt;T&gt; worker = null; while (worker == null) { worker = GetAvailableWorker(); if (worker == null) WaitForWorkerAvailability(); } worker.Task = Task.Factory.StartNew(() =&gt; worker.Command.Execute(message), TaskCreationOptions.LongRunning); // Block new messages from arriving until a worker is available if (GetAvailableWorker() == null) WaitForWorkerAvailability(); } private void WaitForWorkerAvailability() { var tasks = (from w in _workers where w.Task != null select w.Task); Task.WaitAny(tasks.ToArray()); } private WorkingCommandModel&lt;T&gt; GetAvailableWorker() { var worker = _workers.FirstOrDefault(w =&gt; w.Task == null || w.Task.IsCompleted); if (worker != null &amp;&amp; worker.Task != null) worker.Task.Dispose(); return worker; } } </pre> Let's run the solution using 3 workers to process 9 unique messages in the queue. <pre class="brush: c-sharp;"> using (var queue = new MessageQueue(QueuePath)) { queue.Formatter = new BinaryMessageFormatter(); // Write 9 messages to the queue for (var orderId = 1; orderId &lt;= 9; orderId++) { var order = new OrderModel() { Id = orderId, Name = string.Format("Order {0}", orderId) }; queue.Send(order); } // Create 3 workers var workers = new List&lt;ICommand&lt;OrderModel&gt;&gt;(); for (var workerId = 1; workerId &lt;= 3; workerId++) { workers.Add(new OrderProcessorCommand(workerId)); } // Process the queue var command = new CompositeCompetingCommandProcessor&lt;OrderModel&gt;(workers); var queueMonitor = new QueueMonitor&lt;OrderModel&gt;(queue, command); queueMonitor.Start(); } </pre> <pre>Processed Order 1 by worker 1 from 6:42:48 to 6:42:49 Processed Order 2 by worker 2 from 6:42:48 to 6:42:50 Processed Order 4 by worker 1 from 6:42:49 to 6:42:50 Processed Order 3 by worker 3 from 6:42:48 to 6:42:51 Processed Order 6 by worker 1 from 6:42:50 to 6:42:51 Processed Order 5 by worker 2 from 6:42:50 to 6:42:52 Processed Order 8 by worker 1 from 6:42:51 to 6:42:52 Processed Order 7 by worker 3 from 6:42:51 to 6:42:54 Processed Order 9 by worker 2 from 6:42:52 to 6:42:54 </pre> <p>As we can see from the results above, worker 1 processed 4 messages whereas worker 3 only processed 2 messages.</p> <div class="alert alert-info" role="alert"> <b>Warning:</b> The message order is no longer guaranteed. </div> <p>The order can sometimes be important. For example, a reservation system may processes new bookings, updates and cancellations. Let's say the traveller creates a booking and decides to cancel it immediately. When both requests are in a queue processed by multiple workers then there is a potential that the cancellation will be processed before the create.</p> <h3>Summary</h3> <p>A practical example was provided to process messages from a single queue using multiple workers by applying the competing consumers pattern.</p> <p>The drawback of potentially processing messages out of order was briefly covered. With enough interest, I may create a follow up post to address the order issue using various techniques such as a sequential message convoy, applying sessions and using correlation IDs to preserve delivery order.</p> <p>The <a href="/post/push-queue-processing-boundaries-with-3x-greater-throughput">next post</a> will provide an illustration for adding instrumentation to monitor queue processing throughput. The performance of the competing consumers solution is compared with the single worker solution.</p> /post/dispatching-messages-with-the-competing-consumers-pattern jay@webdevelopment.co.nz /post/dispatching-messages-with-the-competing-consumers-pattern#comment /post.aspx?id=691362b3-5d6b-4969-8536-e6cdb44b7eb3 Thu, 24 Dec 2015 23:27:00 +1300 Competing Consumers Pattern Queues Design Patterns Queues C# .NET Jay Strydom /pingback.axd /post.aspx?id=691362b3-5d6b-4969-8536-e6cdb44b7eb3 1 /trackback.axd?id=691362b3-5d6b-4969-8536-e6cdb44b7eb3 /post/dispatching-messages-with-the-competing-consumers-pattern#comment /syndication.axd?post=691362b3-5d6b-4969-8536-e6cdb44b7eb3 Broadcasting messages using the composite pattern <img src="/pics/banners/MessagePatternsBlog.jpg" class="img-responsive" alt="Broadcasting Messages"> <br> <p>The <a href="/post/message-queue-delivery-strategies">previous post</a> focused on reading messages from a queue and pushing the messages to a single command for processing.</p> <p><b>The problem</b> is how can we notify multiple commands to process the same request? For example, when an order is received, we may want a command to 1) send an email, 2) add an audit record and 3) call an external API.</p> <p><b>The solution</b> is to use the composite pattern, whereby a group of commands are treated in a similar way as a single command to fan out the messages.</p> <a href="/Downloads/MessageQueueExampleBroadcasting.zip" role="button" class="btn btn-primary btn-sm">Download Source Code</a> <h3>Setup</h3> <p>The core components such as the Queue Monitor were covered in the <a href="/post/message-queue-delivery-strategies">Message Queue Delivery Strategies</a> post.</p> <h3>Sequential Processing</h3> <p>The CompositeSequentialBroadcastCommand class below implements ICommand to indicate that it exhibits the behaviour of a command. The responsibility of the composite class is to loop though a collection of commands and call each command with the same request.</p> <pre class="brush: c-sharp;"> public interface ICommand&lt;in T&gt; { void Execute(T message); } public class OrderProcessorCommand : ICommand&lt;OrderModel&gt; { private readonly int _id; public OrderProcessorCommand(int id) { _id = id; } public void Execute(OrderModel message) { if (message == null) throw new ArgumentNullException("message"); var start = DateTime.Now; Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine("Processed {0} by worker {1} from {2} to {3}", message.Name, _id, start.ToString("h:mm:ss"), DateTime.Now.ToString("h:mm:ss")); } } public class CompositeSequentialBroadcastCommand&lt;T&gt; : ICommand&lt;T&gt; { private readonly IEnumerable&lt;ICommand&lt;T&gt;&gt; _commands; public CompositeSequentialBroadcastCommand(IEnumerable&lt;ICommand&lt;T&gt;&gt; commands) { if (commands == null) throw new ArgumentNullException("commands"); _commands = commands.ToList(); } public void Execute(T message) { foreach (var command in _commands) { command.Execute(message); } } } </pre> Let's see what happens when 3 sequential workers are configured to process 2 messages. <pre class="brush: c-sharp;"> using (var queue = new MessageQueue(@".\Private$\Orders")) { queue.Formatter = new BinaryMessageFormatter(); var workers = new List&lt;ICommand&lt;OrderModel&gt;&gt;(); for (var workerId = 1; workerId &lt;= 3; workerId++) { workers.Add(new OrderProcessorCommand(workerId)); } var command = new CompositeSequentialBroadcastCommand&lt;OrderModel&gt;(workers); var queueMonitor = new QueueMonitor&lt;OrderModel&gt;(queue, command); queueMonitor.Start(); } </pre> <pre>Processed Order 1 by worker 1 from 6:26:34 to 6:26:36 Processed Order 1 by worker 2 from 6:26:36 to 6:26:38 Processed Order 1 by worker 3 from 6:26:38 to 6:26:40 Processed Order 2 by worker 1 from 6:26:40 to 6:26:42 Processed Order 2 by worker 2 from 6:26:42 to 6:26:44 Processed Order 2 by worker 3 from 6:26:44 to 6:26:46 </pre> <p>The output above shows that each worker processed the same message in sequence. If worker 2 failed then worker 3 would not be executed. This can be a great pattern when the sequence is important. For example, only send the welcome email if the order has been created successfully.</p> <h3>Parallel Processing</h3> <p>The CompositeParallelBroadcastCommand class is similar to the previous example. The only difference is that this class will call the commands in parallel.</p> <pre class="brush: c-sharp;"> class CompositeParallelBroadcastCommand&lt;T&gt; : ICommand&lt;T&gt; { private readonly IEnumerable&lt;ICommand&lt;T&gt;&gt; _commands; public CompositeParallelBroadcastCommand(IEnumerable&lt;ICommand&lt;T&gt;&gt; commands) { if (commands == null) throw new ArgumentNullException("commands"); _commands = commands.ToList(); } public void Execute(T message) { Parallel.ForEach(_commands, c =&gt; c.Execute(message)); } } </pre> Let's see what happens when parallel 3 workers are configured to process 2 messages. <pre class="brush: c-sharp;"> using (var queue = new MessageQueue(@".\Private$\Orders")) { queue.Formatter = new BinaryMessageFormatter(); var workers = new List&lt;ICommand&lt;OrderModel&gt;&gt;(); for (var workerId = 1; workerId &lt;= 3; workerId++) { workers.Add(new OrderProcessorCommand(workerId)); } var command = new CompositeParallelBroadcastCommand&lt;OrderModel&gt;(workers); var queueMonitor = new QueueMonitor&lt;OrderModel&gt;(queue, command); queueMonitor.Start(); } </pre> <pre>Processed Order 1 by worker 2 from 6:08:36 to 6:08:38 Processed Order 1 by worker 1 from 6:08:36 to 6:08:38 Processed Order 1 by worker 3 from 6:08:36 to 6:08:38 Processed Order 2 by worker 3 from 6:08:38 to 6:08:40 Processed Order 2 by worker 2 from 6:08:38 to 6:08:40 Processed Order 2 by worker 1 from 6:08:38 to 6:08:40 </pre> <p>The output above shows that each worker processed the same message concurrently. This can be a great pattern when sequencing doesn’t matters. For example, send a welcome email and add an audit record at the same time.</p> <h3>Summary</h3> <p>This post focused on broadcasting a single message to multiple receivers. The composite pattern allows the behaviour of the Queue Monitor push to change without modifying the code.</p> <b>Coming soon:</b> The next post will focus on processing a single request by dispatching messaging to a pool of competing consumer processors. /post/broadcasting-messages-using-the-composite-pattern jay@webdevelopment.co.nz /post/broadcasting-messages-using-the-composite-pattern#comment /post.aspx?id=46b8ba45-9ca2-4e9d-b42d-b00d553f705f Mon, 21 Dec 2015 23:59:00 +1300 Command Pattern Composite Pattern Queues Command Pattern Composite Pattern Queues Design Patterns Jay Strydom /pingback.axd /post.aspx?id=46b8ba45-9ca2-4e9d-b42d-b00d553f705f 0 /trackback.axd?id=46b8ba45-9ca2-4e9d-b42d-b00d553f705f /post/broadcasting-messages-using-the-composite-pattern#comment /syndication.axd?post=46b8ba45-9ca2-4e9d-b42d-b00d553f705f