Testing Azure ServiceBus Queues
Here's a quick example of using Microsoft Azure's ServiceBus message queue to decouple communication between two applications.
In this case it's two console applications for ease of demonstration although you would never (I hope) use cloud infrastructure for on-premise communication.
In the Write Corner...
...an application that connects to the message bus using
Microsoft.ServiceBus.Messaging.MessagingFactory.CreateFromConnectionString
and sends Microsoft.ServiceBus.Messaging.BrokeredMessage
s using the
Microsoft.ServiceBus.Messaging.MessageSender
class.
Writer
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using System;
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Sending Messages");
string connectionString = "<Your Endpoint Here>";
string queueName = "sample-queue";
var nsm = NamespaceManager.CreateFromConnectionString(connectionString);
if (!nsm.QueueExists(queueName))
{
QueueDescription description = new QueueDescription(queueName);
nsm.CreateQueue(description);
}
MessagingFactory factory = MessagingFactory.CreateFromConnectionString(connectionString);
MessageSender messageSender = factory.CreateMessageSender(queueName);
const int TotalMessages = 100;
Random randomWait = new Random();
for (int i = 0; i < TotalMessages; ++i)
{
string messageText = string.Format("Message {0}", i + 1);
Console.WriteLine("Sending message " + messageText);
BrokeredMessage message = new BrokeredMessage(messageText);
message.Properties.Add("Say", ((i % 2) == 0) ? "hello" : "goodbye");
messageSender.Send(message);
TimeSpan wait = TimeSpan.FromSeconds(randomWait.Next(1, 10));
Console.WriteLine("Waiting for " + wait);
System.Threading.Thread.Sleep(wait);
}
Console.WriteLine("Done, press a key to continue...");
Console.ReadKey();
}
}
In the Read Corner...
...an application that reads from the message bus using the Microsoft.ServiceBus.Messaging.MessageReceiver
class.
Reader
using Microsoft.ServiceBus.Messaging;
using System;
using System.Collections.Generic;
using System.Linq;
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Retrieving Messages...");
string connectionString = "<Your Endpoint Here>";
string queueName = "sample-queue";
MessagingFactory factory = MessagingFactory.CreateFromConnectionString(connectionString);
MessageReceiver messageReceiver = factory.CreateMessageReceiver(queueName, ReceiveMode.PeekLock);
List<long> deferredMessages = new List<long>();
Random randomWait = new Random();
while (true)
{
try
{
if (deferredMessages.Any())
{
Console.WriteLine("Processing deferred messages...");
foreach (long sequenceNumber in deferredMessages)
{
using (BrokeredMessage retrievedMessage = messageReceiver.Receive(sequenceNumber))
{
if (retrievedMessage != null)
{
if (retrievedMessage.Properties.ContainsKey("Say"))
{
Console.WriteLine(retrievedMessage.Properties["Say"].ToString());
}
retrievedMessage.Complete();
}
}
}
deferredMessages.Clear();
}
using (BrokeredMessage originalMessage = messageReceiver.Receive(TimeSpan.FromSeconds(1)))
{
try
{
if (originalMessage != null)
{
// defer some messages
if ((originalMessage.SequenceNumber % 2) == 0)
{
Console.WriteLine("{0} - Deferred", originalMessage.SequenceNumber);
originalMessage.Defer();
deferredMessages.Add(originalMessage.SequenceNumber);
}
else
{
if (originalMessage.Properties.ContainsKey("Say"))
{
Console.WriteLine(originalMessage.Properties["Say"].ToString());
}
Console.WriteLine("{0} - Complete", originalMessage.SequenceNumber);
originalMessage.Complete();
}
}
else
{
Console.Write(".");
}
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
}
catch (Exception ex)
{
Console.WriteLine("Exception {0}", ex.Message);
}
TimeSpan wait = TimeSpan.FromSeconds(randomWait.Next(1, 10));
System.Threading.Thread.Sleep(wait);
}
}
}
Totally, like, random
In both applications I have introduced a bit of randomness in sending and receiving to demonstrate that you can run more than instance of the reader application and each instance should pick up and process their own share of messages without interfering with each other.
This sample also demonstrates the use of the Defer
method to allow an instance to
claim a message to work on, complete some process then come back to the message, retrieve it
from the queue and mark it as complete.