知识大全 基于WCF和MSMQ构建发布/订阅消息总线
Posted 消息
篇首语:你若无书相伴,便没了长相伴。本文由小常识网(cha138.com)小编为大家整理,主要介绍了知识大全 基于WCF和MSMQ构建发布/订阅消息总线相关的知识,希望对你有一定的参考价值。
基于WCF和MSMQ构建发布/订阅消息总线 以下文字资料是由(全榜网网www.cha138.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧!
本文是翻译Tom Hollander先生的blog文章《Building a Pub/Sub Message Bus with WCF and MSMQ》 对英文blog文章感兴趣的朋友 可以直接访问文章下面的链接 译者 开源论坛小组
这几年经常谈到采用事件驱动架构(event driven architecture)技术来构建可扩展的 可维护的系统 我发现在一些方案中 这是非常有趣的模型 但是 在Microsoft平台上这种架构一直没有很好的支持 因此 许多人发现比较难以实现 几年以前 在一个系统中 我采用 Net Remoting / MSMQ / HTTP 构建了发布/订阅消息总线(Pub/Sub Message Bus) 但是觉得不是很完美 很多地方实现比较困难 并且需要定制的代码 如承载队列监听(host the queue listeners) 编码/解码消息 处理可靠性和管理订阅者等等
在我目前的项目中 我再一次尝试了这一模型 然而 这几年技术发展有很大的变化 我高兴的说 我的体验比以前要好多了 在说明我的方案之前 我想提醒的一点是 我仅仅是描述实现这一模型的一种方法 在不同的应用需求情况下 应该有其他更合适的方法 因为我工作的项目是一个大的 Net应用系统 因此跨平台的互操作不需要考虑(真幸运!)
我们完成的项目构建在 NET Framework 平台 并使用了WCF / MSMQ / IIS 部署在Windows server 平台上
定义Service Contract 第一步定义Contact 发布者用来通知订阅者 发生事件 在我们的项目中 有许多不同的事件类型 但是为了尽可能重用代码 我们使用了泛型Service Contract [ServiceContract] public interface IEventNotification<TLog> [OperationContract(IsOneWay = true)] void OnEventOccurred(TLog value);
对于任一事件类型 我们可以简单定义一个Data Contract 来负载数据(carry the payload) 并提供一个继承的Service Contract 类型 如下所示 [ServiceContract] public interface IAccountEventNotification : IEventNotification<AccountEventLog> 实现发布者(Publisher) 发布/订阅模型中最重要的一点是发布和订阅方应该是低耦合的 尤其是发布者不应该知道订阅者的任何事情 包括多少个订阅者和订阅者在哪里 起初 我们试图采用MSMQ的PGM多播特性来实现 – 实质上让你定义单个队列地址 悄悄地路由相同的消息到多个目标队列 虽然这可以满足要求 但是这一方案也存在一些缺点 第一 在WCF中使用多播队列地址的唯一方式是采用MsmqIntegrationBinding MsmqIntegrationBinding没有NetMsmqBinding灵活 其次 多播地址仅仅适用非事务队列(non transactional queues) 这样对我们的系统产生不能接受的可靠性影响
因此 我们放弃了这一方案 决定在发布者实现我们自己的轻量级多播 技术上而言 这样打破了 发布者完全不了解订阅者 这一黄金规则 所有订阅者的信息完全存放在配置文件中 这意味着 我们增加 修改或删除订阅者完全不影响代码
我们开发一个组件 ServiceFactory(与p&p Web Service Sofare Factory 没有关系) ServiceFactory只是一个简单的抽象 通过读取配置文件来创建本地或WCF实例 ServiceFactory组件没有对外公布 但是你可以简单替换为你喜欢的依赖注入框架(Dependency Injection Framework) 达到相同的效果 在我们的系统中 Web Services的配置文件nfig 可能有如下定义的依赖服务 <serviceFactory> <services> <addname= EmailUtility contract= MyProject IEmailUtility MyProject type= MyProject EmailUtility MyProject mode= SameAppDomain instanceMode= Singleton enablePolicyInjection= false /> <addname= SubsctiberXAccountEventNotification contract= MyProject Contracts IAccountEventNotification MyProject Contracts mode= Wcf endpoint= SubsctiberXAccountEventNotification /> <addname= SubsctiberYAccountEventNotification contract= MyProject Contracts IAccountEventNotification MyProject Contracts mode= Wcf endpoint= SubsctiberYAccountEventNotification /> </services> </serviceFactory>
我们使用ServiceFactory创建单个实例 代码如下 IEmailUtility email = ServiceFactory GetService<IEmailUtility>();
根据上面的配置文件 上述代码将获得一个本地EmailUtility单件实例 但是不同的配置可以返回一个WCF proxy 代理类实例 可以非常方便重用ServiceFactory组件 返回所有配置的 匹配特定Contract的服务 我们将根据这些 构建NotificationPublisher类 代码如下 public class NotificationPublisher<TInterface TLog> where TInterface : class IEventNotification<TLog> public static void OnEventOccurred(TLog value) List<TInterface> subscribers = ServiceFactory GetAllServices<TInterface>();
foreach (TInterface subscriber in subscribers) subscriber OnEventOccurred(value);
根据上述代码 发布者发布事件所需要做的是 传入合适的泛型参数 实例化NotificationPublisher对象 并调用OnEventOccured 方法 假定我们使用IAccountEventNotification 接口和上述配置 这样事件将通过WCF到达SubscriberXAccountEventNotification 和 SubscriberYAccountNotification 端点 并触发相关事件
配置发布者 发布端最后一部分是WCF配置 如上述所提及的 我们选择使用MSMQ提供可靠的 异步的消息传递 过去编写MSMQ代码比较困难 但是对WCF编程模型而言 MSMQ与其他传输协议没什么区别 在我们的案例中 我们选择了NetMsmqBinding NetMsmqBinding 为核心MSMQ特性提供了全面访问WCF功能(与MsmqIntegrationBinding不同 MsmqIntegrationBinding提供了更丰富的MSMQ支持 但是限制了WCF功能)
如下是客户端的WCF的配置示例 <system serviceModel> <bindings> <netMsmqBinding> <bindingname= TransactionalMsmqBinding exactlyOnce= true deadLetterQueue= system /> </netMsmqBinding> </bindings> <client> <endpointname= SubscriberXAccountEventNotification address= net msmq://localhost/private/SubscriberX/accounteventnotification svc binding= netMsmqBinding bindingConfiguration= TransactionalMsmqBinding contract= MyProject Contracts IAccountEventNotification /> <endpointname= SubscriberYAccountEventNotification address= net msmq://localhost/private/SubscriberY/accounteventnotification svc binding= netMsmqBinding bindingConfiguration= TransactionalMsmqBinding contract= MyProject Contracts IAccountEventNotification /> </client> </system serviceModel>
上述配置没什么特别的地方 – 需要关注的是 exactlyOnce= true 设置 这是事务队列必须的设置 另外就是 net msmq:// 地址语法 这是NetMsmqBinding 协议所需要的 私有队列分别为 SubscriberX/accounteventnotification svc 和 SubscriberY/accountnotification svc 为什么我给队列这样愚蠢的命名呢?继续读下面内容
承载和配置订阅者 在过去 如果说创建MSMQ客户端是烦人的 那么创建MSMQ服务更是噩梦 你不得不创建你自己的host(一般而言为Windows Service)或者使用一些灵活的MSMQ触发器功能 然后 你需要做很多工作 确保你的服务没有丢失消息 或者没有被 poison messages 所阻塞 因为poison message错误的消息体(malformed payload)会不断导致你的服务失败
就像在客户端一样 WCF需要很多工作在服务端 – 但是这些不是直接帮助承载服务和监听队列 幸运的是 这一问题由Windows Vista和Windows server 中提供的IIS 和Windows Activation Services (WAS) 轻松解决 IIS 负责监听MSMQ / tcp / Named Pipes 并且激活WCF 服务 就像 IIS 监听HTTP一样 听起来不错 但是需要提醒的是 – 这些需要灵巧的配置
首先 你需要在IIS中设置application 指向service 包括 svc文件和nfig配置文件 这与在IIS 通过HTTP部署service一样
接着 你需要创建消息队列 你可以通过Vista的Computer Management console 或Windows server 的Server Manager配置 队列的名称必须匹配application name 加上 svc 文件名 例如 SubscriberX/accounteventnotification svc 在创建队列时 确保标记队列支持事务 因为随后不能改变 你也需要设置队列的权限 这样运行Net Msmq Listener 服务的帐号(缺省为NEORK SERVICE)能够接收消息 任何运行Client/Publisher 都能够发送消息(缺省为NEORK SERVICE)
最后 你需要配置IIS和WAS 的站点和特定application支持Net Msmq 监听器(在开始操作之前 确保你已经安装了WAS和non HTTP激活windows组件) 最简单的办法是使用appcmd exe 命令行(\\system \\inetsrv目录下) appcmd set site Default Web Site +bindings [protocol= net msmq bindingInformation= localhost ] appcmd set app Default Web Site/SubscriberX /enabledProtocols:net msmq 配置好IIS后 接下来是确保service的WCF配置是正确的 如同你期望的那样 这将与客户端的配置非常相似 <system serviceModel> <bindings> <netMsmqBinding> <bindingname= TransactionalMsmqBinding exactlyOnce= true deadLetterQueue= system receiveErrorHandling= Move /> </netMsmqBinding> </bindings> <services> <servicename= SubscriberX NotificationService > <endpointcontract= MyProject Contracts IAccountEventNotification bindingConfiguration= TransactionalMsmqBinding binding= netMsmqBinding address= net msmq://localhost/private/SubscriberX/accounteventnotification svc /> </service> </services> </system serviceModel>
值得说明的一点是 receiveErrorHandling= Move 这一属性可以帮助节省我们一个月的工作 这一属性让WCF转移多次重复处理失败的消息到MSMQ的子队列poison 然后继续处理下一个消息 而不是阻塞服务 这一子队列和远程队列事务性读取等待功能是Vista 和 windows server 中MSMQ 的一些新特性
实现订阅者(Subscribers) 最后一件事是实现订阅者 当然 最多的代码是特定业务逻辑的实现 因为我仅仅描述service interface的实现 在我们的系统中 确保没有消息丢失是非常重要的 既然MSMQ能够确保消息的到达 因此消息不会无缘故的消失 事实上 大多数消息的丢失是在MSMQ成功传递消息到达service 之后 有可能service 接收到消息之后 在service 成功处理消息之前 发生异常导致失败(可能由于bug或配置问题) 避免这一问题最好的办法是使用事务 跨越从队列接收消息和业务逻辑的处理 如果发生失败 将回滚事 包括从队列中接收的消息 如果只是一个临时的小故障 消息将再次被成功处理 如果问题持续或是错误的消息 在经过多次尝试后 该消息将被认为是poison 消息 如前面提及的 该消息将被转移到poison 子队列 由管理员手动处理
上述所有的工作非常简单 因为MSMQ和WCF支持所有这些特性(假定你使用事务性队列) 你需要做的工作是由一些attributes标记你的服务实现 声明当消息从队列取出后 业务逻辑应该登记事务
public class NotificationService : IAccountEventNotification [OperationBehavior(TransactionScopeRequired = true TransactionAutoComplete = true)] public void OnEventOccurred(AccountEventLog value) // Business specific logic 如对上述solution 的实现有疑问或改进建议 欢迎到我们的论坛( )进行交流
结论 这是我最近最长的blog文章之一 这一解决方案非常强大且特别简单去实现 这是由于WCF / MSMQ / IIS 技术的先进性 在过去 许多人(包括我)花了几个月的时间尽力去实现pub/sub模式 往往不能达到预期的效果 现在 使用这些的技术消除了大量的定制代码 事实上 这篇文章中少量的代码和配置脚本实现了pub/sub模式
译者注 我们是 开源论坛小组 负责 / YAF 开源ASP NET/C# 论坛的开发工作 免费提供项目源代码下载 欢迎您访问 下载 交流和学习 包括Enterprise Message Bus / WCF / MSMQ / BizTalk / SSB / sql server / Net Framework 等等
下载 / YAF 开源论坛 v (ASP NET/C#) x?g=posts&t=
cha138/Article/program/Java/hx/201311/26674相关参考
一起学WCF--消息通信模式 以下文字资料是由(全榜网网www.cha138.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧! 这一节大家共同研究下
Dojo订阅/发布模拟淘宝到货提醒 以下文字资料是由(全榜网网www.cha138.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧! 在淘宝我们对某
知识大全 ASP.NET入门教程 12.10.1加密和基于消息的安全性
ASP.NET入门教程12.10.1加密和基于消息的安全性 以下文字资料是由(全榜网网www.cha138.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一
五八同城怎样发布消息,如何在58同城免费发布消息在58同城注册一个账户,登陆后点击我要发布,然后选择你要发布的类别,编辑内容就可以了!五八同城为啥发布消息审核不通过要注意不通过的提示,一般需要个人实名
构建基于Web/XML的信息集成研究 以下文字资料是由(全榜网网www.cha138.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧! 摘要在当前的
基于Jazz技术构建企业级Web2.0应用 以下文字资料是由(全榜网网www.cha138.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧! Jaz
针对当前污水处理控制系统的规模不断扩大、监控设备日益复杂、性能日益提高的状况,传统的监控模式已经制约了污水处理控制系统的发展。为此构建了一种基于OPC技术和CAN现场总线技术的分布式污水处理控制系统体
针对当前污水处理控制系统的规模不断扩大、监控设备日益复杂、性能日益提高的状况,传统的监控模式已经制约了污水处理控制系统的发展。为此构建了一种基于OPC技术和CAN现场总线技术的分布式污水处理控制系统体
针对当前污水处理控制系统的规模不断扩大、监控设备日益复杂、性能日益提高的状况,传统的监控模式已经制约了污水处理控制系统的发展。为此构建了一种基于OPC技术和CAN现场总线技术的分布式污水处理控制系统体
基于WebSphereMQ的收发消息程序 以下文字资料是由(全榜网网www.cha138.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧! WebS