Testing Azure ServiceBus Queues

Here's a quick example of using Microsoft Azure's ServiceBus message queue to decouple communication between two applications.

In this case it's two console applications for ease of demonstration although you would never (I hope) use cloud infrastructure for on-premise communication.

In the Write Corner...

...an application that connects to the message bus using Microsoft.ServiceBus.Messaging.MessagingFactory.CreateFromConnectionString and sends Microsoft.ServiceBus.Messaging.BrokeredMessages using the Microsoft.ServiceBus.Messaging.MessageSender class.

Writer


using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using System;

class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine("Sending Messages");

        string connectionString = "<Your Endpoint Here>";
        string queueName = "sample-queue";

        var nsm = NamespaceManager.CreateFromConnectionString(connectionString);

        if (!nsm.QueueExists(queueName))
        {
            QueueDescription description = new QueueDescription(queueName);
            nsm.CreateQueue(description);
        }

        MessagingFactory factory = MessagingFactory.CreateFromConnectionString(connectionString);
        MessageSender messageSender = factory.CreateMessageSender(queueName);

        const int TotalMessages = 100;

        Random randomWait = new Random();

        for (int i = 0; i < TotalMessages; ++i)
        {
            string messageText = string.Format("Message {0}", i + 1);
            Console.WriteLine("Sending message " + messageText);

            BrokeredMessage message = new BrokeredMessage(messageText);
            message.Properties.Add("Say", ((i % 2) == 0) ? "hello" : "goodbye");

            messageSender.Send(message);

            TimeSpan wait = TimeSpan.FromSeconds(randomWait.Next(1, 10));

            Console.WriteLine("Waiting for " + wait);
            System.Threading.Thread.Sleep(wait);
        }

        Console.WriteLine("Done, press a key to continue...");
        Console.ReadKey();
    }
}

writer

In the Read Corner...

...an application that reads from the message bus using the Microsoft.ServiceBus.Messaging.MessageReceiver class.

Reader


using Microsoft.ServiceBus.Messaging;
using System;
using System.Collections.Generic;
using System.Linq;

class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine("Retrieving Messages...");

        string connectionString = "<Your Endpoint Here>";
        string queueName = "sample-queue";

        MessagingFactory factory = MessagingFactory.CreateFromConnectionString(connectionString);
        MessageReceiver messageReceiver = factory.CreateMessageReceiver(queueName, ReceiveMode.PeekLock);

        List<long> deferredMessages = new List<long>();

        Random randomWait = new Random();

        while (true)
        {
            try
            {
                if (deferredMessages.Any())
                {
                    Console.WriteLine("Processing deferred messages...");

                    foreach (long sequenceNumber in deferredMessages)
                    {
                        using (BrokeredMessage retrievedMessage = messageReceiver.Receive(sequenceNumber))
                        {
                            if (retrievedMessage != null)
                            {
                                if (retrievedMessage.Properties.ContainsKey("Say"))
                                {
                                    Console.WriteLine(retrievedMessage.Properties["Say"].ToString());
                                }

                                retrievedMessage.Complete();
                            }
                        }
                    }

                    deferredMessages.Clear();
                }

                using (BrokeredMessage originalMessage = messageReceiver.Receive(TimeSpan.FromSeconds(1)))
                {
                    try
                    {
                        if (originalMessage != null)
                        {
                            // defer some messages
                            if ((originalMessage.SequenceNumber % 2) == 0)
                            {
                                Console.WriteLine("{0} - Deferred", originalMessage.SequenceNumber);
                                originalMessage.Defer();
                                deferredMessages.Add(originalMessage.SequenceNumber);
                            }
                            else
                            {
                                if (originalMessage.Properties.ContainsKey("Say"))
                                {
                                    Console.WriteLine(originalMessage.Properties["Say"].ToString());
                                }

                                Console.WriteLine("{0} - Complete", originalMessage.SequenceNumber);
                                originalMessage.Complete();
                            }
                        }
                        else
                        {
                            Console.Write(".");
                        }
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.ToString());
                    }
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine("Exception {0}", ex.Message);
            }

            TimeSpan wait = TimeSpan.FromSeconds(randomWait.Next(1, 10));
            System.Threading.Thread.Sleep(wait);
        }
    }
}

reader 1

Totally, like, random

In both applications I have introduced a bit of randomness in sending and receiving to demonstrate that you can run more than instance of the reader application and each instance should pick up and process their own share of messages without interfering with each other.

reader 2

This sample also demonstrates the use of the Defer method to allow an instance to claim a message to work on, complete some process then come back to the message, retrieve it from the queue and mark it as complete.