| 在本文的前两篇文章里对MSMQ的相关知识点进行了介绍,很多阅读过这前两篇文章的朋友都曾问到过这样一些问题:  1、如何把MSMQ应用到实际的项目中去呢?  2、可不可以介绍一个实际的应用实例?  3、......  在前两篇文章里,关于MSMQ常用的技术点基本介绍完毕了,本文主要以MS开源项目PetShop中的MSMQ应用作为案例来介绍MSMQ在实际项目中的应用。在PetShop里,由于系统使用了多线程的专用应用程序来监控消息队列,在进入PetShop应用分析前,我们先来了解下关于多线程和MSMQ的相关知识点。 一、多线程和MSMQ  现在有这样一个需求,指定的消息队列里不管有无消息数据,我们通过一个多线程来监控这个队列,一但队列里的数据发生变化就做出相应的处理,比如把消息读取出来。根据这个需求,我们来做个示例,用一多线程把队列监控起来,如果队列里有消息数据,就把消息读取出来,如果没有则一直监视队列,当队列数据发生改变(有新的消息加入)的时候就作出处理(读取消息)。  首先定义一个线程数组用于存储线程数;  
                          1 static private int ThreadNumber = 5;  //5个线程序 2
  static private Thread[] ThreadArray = new Thread[ThreadNumber]; 我们把需要启动的线程装载入ThreadArray数组,通过一个遍历数组把所以的线程启动,实际这里只有5个线程。  
                           1 private void button1_Click(object sender, EventArgs e) 2
    { 3
  StartThreads(); 4
  } 5
  6
  private void StartThreads() 7
    { 8
  int counter; //线程计数 9
  for (counter = 0; counter < ThreadNumber; counter++) 10
     { 11
  ThreadArray[counter] = new Thread(new ThreadStart(MSMQListen)); 12
  ThreadArray[counter].Start(); 13
  this.richTextBox2.Text += (counter + 1).ToString() + "号线程开始!"; 14
  } 15
  } 16
  17
  private void MSMQListen() 18
    { 19
  while (true) 20
     { 21
  //取出队列里的消息 22
  MessageBox.Show(MsgQueue.ReceiveMessage()); 23
  } 24
  } 如果上面这段代码阅读起存在问题,建议先去了解下多线程的相关知识点。在StartThreads方法里启动数组里存储的所以线程,并委托给MSMQListen方法进行处理,MSMQListen方法完成的就是读取队列里的消息,这里我使用了在第二篇文章里所使用的MsgQueue类和Book类,详细请阅读第二篇文章ASP.NET中进行消息处理(MSMQ) 
                          二 。    启动了5个线程,用来监视指定的消息队列,如上图。那好,我们现在就来测试一下,通过给队列里发送消息,看线程是否会有响应。从上面启动线程的代码上可以很清晰的看出,只要队列里有消息在多线程的监视下,线程就会把队列里的消息读取出来。  
                           1 private void button3_Click(object sender, EventArgs e) 2
    { 3
  Book book = new Book(); 4
  book.BookId = 1; 5
  book.BookName = "asp.net"; 6
  book.BookAuthor = "abcd"; 7
  book.BookPrice = 50.80; 8
  9
  MsgQueue.SendMessage(book); 10
  } 那么这里的测试,向队列里发送了一Book对象消息,根据上面分析,则多线程便会同时把此条消息读取出来: 
                          呵呵,测试结果出来,看来此测试达到了我们之前所提出的需求。
  !OK,关于MSMQ和多线程就简单介绍于此。
 二、MSMQ在开塬项目PetShop中的应用分析。  在PetShop 4.0中,利用消息队列临时存放要插入的数据,来避免因为频繁访问数据库的操作。而队列中的消息,则等待系统的专用的应用程序来处理,最后将数据插入到数据库中。  PetShop 4.0中的消息处理,主要分为下面几大部分:订单策略接口IOrderStategy、消息接口IMessageing、消息工厂MessageFactory、MSMQ实现MSMQMessaging、后台处理应用程序OrderProessor。如下图: 
 1、订单策略接口IOrderStategy PetShop 4.0的体系结构是非常庞大,在订单处理上有两种处理策略,这里也是策略模式的一个应用,IOrderStrategy接口作为订单策略的高层抽象,实现不同订单处理的具体策略去实现它,UML如下:  
 示意性代码:  
                           1 namespace PetShop.IBLLStrategy 2
    { 3
  public interface IOrderStrategy 4
     { 5
  void Insert(OrderInfo order); 6
  } 7
  } 8
  9
  namespace PetShop.BLL 10
    { 11
  public class OrderSynchronous:IOrderStrategy 12
     { 13
  private static readonly IOrder asynchOrder = QueueAccess.CreateOrder(); 14
  15
  public void Insert(OrderInfo order) 16
     { 17
  asynchOrder.Send(order); 18
  } 19
  } 20
  } 21
  22
  //   从上面UML和代码就可以看出,订单策略接口下有两种实现,使用了抽象工厂模式来完成相应的订单策略对象的创建 。关于这点在后面消息工厂部分去介绍,这里不作讲解。  2、消息接口IMessageing  在PetShop 4.0中,由于对订单处理使用了异步处理方式,在消息接口中仅定义了一个IOrder接口。IOrder接口的定义与MSMQ的实现是一致的,需要提供发送和接收操作。在Send方法中,参数为数据访问层的数据实体对象(OrderInfo),具体的实现则是用MSMQ的实现类(PetShop.MSMQMessaging.Order)去完成的。      MS的开发人员真的是什么都能想到,在消息接口的实现上考虑得很全面,为了避免将来的扩展会有其他的数据对象也使用到MSMQ;因此,在PetShop 
                          4.0中的消息接口实现中,定义了一个队列的基类(PetShopQueue),实现了消息的发送(Send)和接收(Receive)方法的基本操作。代码如下:  
                           1 namespace PetShop.MSMQMessaging 2
    { 3
  public class PetShopQueue:IDisposable 4
     { 5
  //指定消息队列事务的类型 6
  protected MessageQueueTransactionType transactionType = MessageQueueTransactionType.Automatic; 7
  protected MessageQueue queue;  //消息队列 8
  protected TimeSpan timeout;    //时间间隔 9
  10
  public PetShopQueue(string queuePath, int timeoutSeconds) 11
     { 12
  queue = new MessageQueue(queuePath);  //根据传入quueuPath创建队列 13
  timeout = TimeSpan.FromSeconds(Convert.ToDouble(timeoutSeconds)); 14
  15
  queue.DefaultPropertiesToSend.AttachSenderId = false; 16
  queue.DefaultPropertiesToSend.UseAuthentication = false; 17
  queue.DefaultPropertiesToSend.UseEncryption = false; 18
  queue.DefaultPropertiesToSend.AcknowledgeType = AcknowledgeTypes.None; 19
  queue.DefaultPropertiesToSend.UseJournalQueue = false; 20
  } 21
  22
   /**//// <summary> 23
  /// 接收消息方法 24
  /// </summary> 25
  public virtual object Receive() 26
     { 27
  try 28
     { 29
  using (Message message = queue.Receive(timeout, transactionType)) 30
  return message; 31
  } 32
  catch (MessageQueueException mqex) 33
     { 34
  if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout) 35
  throw new TimeoutException(); 36
  throw; 37
  } 38
  } 39
  40
   /**//// <summary> 41
  /// 发送消息 42
  /// </summary> 43
  public virtual void Send(object msg) 44
     { 45
  queue.Send(msg, transactionType); 46
  } 47
  48
   IDisposable 成员#region IDisposable 成员 49
  public void Dispose() 50
     { 51
  queue.Dispose();  //解放资源 52
  } 53
  #endregion 54
  } 55
  } SMQ队列是一个可持久的队列,不会因用户不间断的下订单导致数据丢失。queue作为存放数据的队列,为消息队列(MessageQueue)类型,同时还为PetShopQueue设置了timeout值,后台处理应用程序(OrderProessor)会根据timeout的值定期扫描队列中的订单数量。  3、消息工厂MessageFactory  可能是考虑到IOrder的实现会改变不同的策略吧,在PetShop里利用了抽象工厂模式,将IOrder对象的创建用了专门的工厂模块(MessageFactory)进行封装,定义如下:  
                           1 namespace PetShop.MessagingFactory 2
    { 3
   /**//// <summary> 4
  /// This class is implemented following the Abstract Factory pattern to create the Order 5
  /// Messaging implementation specified from the configuration file 6
  /// </summary> 7
  public sealed class QueueAccess 8
     { 9
  //<add key="OrderMessaging" value="PetShop.MSMQMessaging"/> 10
  private static readonly string path = "PetShop.MSMQMessaging"; 11
  12
   /**//// <summary> 13
  /// 私有构造器,防止使用new创建对象实例 14
  /// </summary> 15
  private QueueAccess() 16
     { } 17
  18
  public static IOrder CreateOrder() 19
     { 20
  string className = path + ".Order"; 21
  return (IOrder)Assembly.Load(path).CreateInstance(className); 22
  } 23
  } 24
  } 在QueueAccess类中,通过CreateOrder方法利用反射技术创建正确IOrder类型对象(实际也就是创建了一个接口的具体实现类的对象,应用了多态的原理和反射技术)。UML图下:    在PetShop4.0中,消息接口的具体实现是通过配置文件定义在web.config里:  
                           <add key="OrderMessaging" value="PetShop.MSMQMessaging"/> 这里我为了能够更直观的演示和介绍就把path固化定义了,如下:  
                           private static readonly string path = "PetShop.MSMQMessaging"; 这里利用工厂模式来负责对象的创建,主要是方便业务逻辑层对定单处理策略的调用,如在PetShop.BLL模块中的OrderSynchronous类:  
                           1 namespace PetShop.BLL 2
    { 3
  public class OrderSynchronous:IOrderStrategy 4
     { 5
  private static readonly IOrder asynchOrder = QueueAccess.CreateOrder(); 6
  7
  public void Insert(OrderInfo order) 8
     { 9
  asynchOrder.Send(order); 10
  } 11
  } 12
  }  这样一但IOrder接口的实现发生了变化,此时就只需要修改配置文件就OK,整个系统就显得很灵活,稳定。  4、MSMQ实现MSMQMessaging  在PetShop.MSMQMessaging模块中,订单对象实现了消息接口(IMessaging)模块中的IOrder,同时还继承了基类PetShopQueue。定义如下:  
                           1 namespace PetShop.MSMQMessaging 2
    { 3
  public class Order:PetShopQueue,IOrder 4
     { 5
  private static readonly string queuePath = ConfigurationManager.AppSettings["OrderQueuePath"]; 6
  private static int queueTimeout = 20;  //20秒为超时 7
  8
  public Order() 9
  : base(queuePath, queueTimeout) 10
     { 11
  queue.Formatter = new BinaryMessageFormatter(); 12
  Console.WriteLine(queuePath); 13
  } 14
  15
  public new OrderInfo Receive() 16
     { 17
  //该方法会应用在分布式事务中,故而设置为Automatic的事务类型。 18
  base.transactionType = MessageQueueTransactionType.Automatic; 19
  return (OrderInfo)((Message)base.Receive()).Body; 20
  } 21
  22
  public OrderInfo Receive(int timeout) 23
     { 24
  base.timeout = TimeSpan.FromSeconds(Convert.ToDouble(timeout)); 25
  return Receive(); 26
  } 27
  28
   /**//// <summary> 29
  /// 异步发送定单到消息队列 30
  /// </summary> 31
  /// <param name="orderMessage"></param> 32
  public void Send(OrderInfo orderMessage) 33
     { 34
  //该方法不会用于分布式事务中,故而设置为Single的事务类型。 35
  base.transactionType = MessageQueueTransactionType.Single; 36
  base.Send(orderMessage); 37
  } 38
  } 39
  } UML草图: 
  这里需要注意的是,Order类既继承了基类PetShopQueue,同时还实现了接口IOrder,而在消息接口和基类中所定义的接收消息(Receive)方法在方法的签名上是相同的。所以在Order的Receive方法实现中,必须使用new而非override关键字来重写父类PetShopQueue的Receive虚方法。此时Order类的Receive方法代表两个含义,一是实现了消息接口IOrder中的Receive方法;二则是利用了new关键字重写了父类PetShopQueue的Receive虚方法。  在PetShop4.0中,将面向对象的知识点应用得非常精妙,如上分析,此时我门可以怎么来调用Order呢?是这样吗?  
                          1 //1、使用基类PetShopQueue 2
  PetShopQueue order = new Order(); 3
  order.Receive(); 4
  5
  //2、使用消息接口IOrder 6
  IOrder  order = new Order(); 7
  order.Receive(); 根据多态原理,上面这两种实现都是正确的,那我们那取谁舍谁呢?在PetShop4.0中正确的调用是第2种方法,这种调用方法也更符合“面向接口设计”的原则。----详细请查看消息工厂MessageFactory部分的介绍。  5、后台处理应用程序OrderProessor  前面一系列的操作,最终都会走向到这里,这里实现了最终的插入数据库的操作。在PetShop 4.0中OrderProessor是一个控制台应用程序,根据需求也可以将其设计为Windows 
                          Service。他完成的操作就是接收消息队列里的订单数据,将其插入到数据库。在OrderProessor里使用了多线程技术,监视队列里的订单信息,定期的将其处理。在主方法Main方法中用于控制线程序,核心的执行任何则委托给ProcessOrders方法去实现。  
                           1 private static void ProcessOrders() 2
    { 3
  TimeSpan tsTimeout = TimeSpan.FromSeconds(Convert.ToDouble(transactionTimeout * batchSize)); 4
  Order order = new Order();  //逻辑层的PetShop.BLL.Order 5
  while (true) 6
     { 7
  TimeSpan datatimeStarting = new TimeSpan(DateTime.Now.Ticks); 8
  double elapsedTime = 0; 9
  10
  int processedItems = 0; 11
  ArrayList queueOrders = new ArrayList(); 12
  13
  //首先验证事务 14
  using (TransactionScope ts = new TransactionScope(TransactionScopeOption.Required, tsTimeout)) 15
     { 16
  //从队列中检索订单 17
  for (int i = 0; i < batchSize; i++) 18
     { 19
  try 20
     { 21
  //在一定时间 类接收队列的订单 22
  if ((elapsedTime + queueTimeout + transactionTimeout) < tsTimeout.TotalSeconds) 23
     { 24
  queueOrders.Add(order.ReceiveFromQueue(queueTimeout)); 25
  } 26
  else 27
     { 28
  i = batchSize;  // 结束循环 29
  } 30
  elapsedTime = new TimeSpan(DateTime.Now.Ticks).TotalSeconds - datatimeStarting.TotalSeconds; 31
  } 32
  catch (TimeoutException) 33
     { 34
  //没有可以等待的消息也结束循环 35
  i = batchSize; 36
  } 37
  } 38
  39
  //处理队列的订单 40
  for (int k = 0; k < queueOrders.Count; k++) 41
     { 42
  order.Insert((OrderInfo)queueOrders[k]); 43
  processedItems++; 44
  totalOrdersProcessed++; 45
  } 46
  //处理完毕或者是超时 47
  ts.Complete(); 48
  } 49
  Console.WriteLine("(Thread Id " + Thread.CurrentThread.ManagedThreadId + ") batch finished, " + processedItems + " items, in " + elapsedTime.ToString() + " seconds."); 50
  } 51
  } ProcessOrders方法首先通过业务逻辑层PetShop.BLL.Order类的方法ReceiveFromQueue去获取消息队列中的订单数据,并将其放入一个ArrayList对象,然后将其插入到数据库。 
                          OrderProcessor的完整代码定义如下:   
                            Code 1
  using System; 2
  using System.Collections.Generic; 3
  using System.Text; 4
  using System.Configuration; 5
  using System.Threading; 6
  using PetShop.BLL; 7
  using System.Collections; 8
  using System.Transactions; 9
  using PetShop.Model; 10
  11
  namespace PetShop.OrderProcessor 12
    { 13
  class Program 14
     { 15
  private static int transactionTimeout = int.Parse(ConfigurationManager.AppSettings["TransactionTimeout"]); 16
  private static int queueTimeout = int.Parse(ConfigurationManager.AppSettings["QueueTimeout"]); 17
  private static int batchSize = int.Parse(ConfigurationManager.AppSettings["BatchSize"]); 18
  private static int threadCount = int.Parse(ConfigurationManager.AppSettings["ThreadCount"]); 19
  private static int totalOrdersProcessed = 0; 20
  21
  static void Main(string[] args) 22
     { 23
  Thread workTicketThread; 24
  Thread[] workerThreads = new Thread[threadCount]; 25
  26
  for (int i = 0; i < threadCount; i++) 27
     { 28
  workTicketThread = new Thread(new ThreadStart(ProcessOrders)); 29
  //指示线呈是否为一后台线程 30
  workTicketThread.IsBackground = true; 31
  workTicketThread.SetApartmentState(ApartmentState.STA); 32
  33
  workTicketThread.Start(); 34
  workerThreads[i] = workTicketThread; 35
  } 36
  37
  Console.WriteLine("开始处理,按任意键停止."); 38
  Console.ReadLine(); 39
  Console.WriteLine("正在终止线程,请等待   "); 40
  41
  //终止所以线程 42
  for (int i = 0; i < workerThreads.Length; i++) 43
     { 44
  workerThreads[i].Abort(); 45
  } 46
  47
  Console.WriteLine(); 48
  Console.WriteLine(totalOrdersProcessed + " 张订单已经处理."); 49
  Console.WriteLine("已终止处理.按任意键退出   "); 50
  Console.ReadLine(); 51
  } 52
  53
  private static void ProcessOrders() 54
     { 55
  TimeSpan tsTimeout = TimeSpan.FromSeconds(Convert.ToDouble(transactionTimeout * batchSize)); 56
  Order order = new Order();  //逻辑层的PetShop.BLL.Order 57
  while (true) 58
     { 59
  TimeSpan datatimeStarting = new TimeSpan(DateTime.Now.Ticks); 60
  double elapsedTime = 0; 61
  62
  int processedItems = 0; 63
  ArrayList queueOrders = new ArrayList(); 64
  65
  //首先验证事务 66
  using (TransactionScope ts = new TransactionScope(TransactionScopeOption.Required, tsTimeout)) 67
     { 68
  //从队列中检索订单 69
  for (int i = 0; i < batchSize; i++) 70
     { 71
  try 72
     { 73
  //在一定时间 类接收队列的订单 74
  if ((elapsedTime + queueTimeout + transactionTimeout) < tsTimeout.TotalSeconds) 75
     { 76
  queueOrders.Add(order.ReceiveFromQueue(queueTimeout)); 77
  } 78
  else 79
     { 80
  i = batchSize;  // 结束循环 81
  } 82
  elapsedTime = new TimeSpan(DateTime.Now.Ticks).TotalSeconds - datatimeStarting.TotalSeconds; 83
  } 84
  catch (TimeoutException) 85
     { 86
  //没有可以等待的消息也结束循环 87
  i = batchSize; 88
  } 89
  } 90
  91
  //处理队列的订单 92
  for (int k = 0; k < queueOrders.Count; k++) 93
     { 94
  order.Insert((OrderInfo)queueOrders[k]); 95
  processedItems++; 96
  totalOrdersProcessed++; 97
  } 98
  //处理完毕或者是超时 99
  ts.Complete(); 100
  } 101
  Console.WriteLine("(Thread Id " + Thread.CurrentThread.ManagedThreadId + ") batch finished, " + processedItems + " items, in " + elapsedTime.ToString() + " seconds."); 102
  } 103
  } 104
  } 105
  } 106
   MSMQ技术除了用于异步处理之外,还可作为一种分布式处理技术应用。关于使用MSMQ进行分布式处理相关的内容,本人能力有限,还请大家查看相关的资料进行了解。  本文介绍了MSMQ和多线程以及对PetShop 4.0中对MSMQ的应用进行了分析,结合之前我写的两篇关于MSMQ的相关知识点的介绍文章,对MSMQ算是建立了一个全面的认识,希望这三篇文章对大家学习MSMQ有所帮助。 文中示例代码下载 
                       |