在平常的项目设计中,我们经常会用到各种各样的队列来实现分布式系统的异步调用逻辑和数据消息的传递,从而来达到增强应用程序的性能和可伸缩性,通过事务性队列的离线消息处理机制更可以大大提高系统的可靠性。SQLServer自2005以后版本便增加了一个新的内置队列处理应用模块ServiceBroker,此功能模块大大简化了队列的使用操作,更方便的是能和原先的SQLServer系统在事务处理上完美的结合在一起。可是唯一的缺陷是增加了设计的耦合性。
接下来我们就如何使用ServiceBroker来做一个场景描述。我们需要设计一个Web系统,系统由一个主Web系统和多个子Web系统组成。有一种情况下,在主模块中会产生一类行为数据,这类行为数据需要传播或者记录到各个子模块的数据库中处理。要如何实现这样的功能呢,当然我们可以选用多种现有技术来实现,如(WebService,链接服务器技术,WCF等技术)。在这里为了说明ServiceBroker的简单易用性,我们为此做一简单示例。
前提:各个应用数据库要允许ServiceBroker和设置数据库主密钥
代码部署划分:1公共部分(初始方和目标方共同使用),2.初始方,3.目标方
示例具体实现步骤主要分为(具体参数详细配置请参考MSDN文档):
1.实现ServiceBroker消息、队列和服务
公共部分
定义消息类型架构集合
CREATEXMLSCHEMACOLLECTION
[http://Samples/SQL/ServiceBroker/msgOperationSchema]
ASN'?xmlversion="1.0"?
xs:schemaxmlns:xs="http://www.w3.org/2001/XMLSchema"
xs:elementname="msgOperation"
xs:complexType
xs:sequence
xs:elementname="msgId"type="xs:int"/
xs:elementname="msgContent"type="xs:string"/
/xs:sequence
/xs:complexType
/xs:element
/xs:schema';
定义消息类型
CREATEMESSAGETYPE[http://Samples/SQL/ServiceBroker/msgOperation]
VALIDATION=VALID_XMLWITHSCHEMACOLLECTION
[http://Samples/SQL/ServiceBroker/msgOperationSchema];
定义消息契约
CREATECONTRACT[http://Samples/SQL/ServiceBroker/msgOperationContract]
(
[http://Samples/SQL/ServiceBroker/msgOperation]
SENTBYINITIATOR
);
初始方
定义队列
(本文来源于图老师网站,更多请访问http://m.tulaoshi.com/bianchengyuyan/)CREATEQUEUEmsgOperationInitQueue
WITH
STATUS=ON,
RETENTION=OFF
GO
定义初始服务
CREATESERVICE[http://Samples/SQL/ServiceBroker/msgOperationInitService]
ONQUEUEmsgOperationInitQueue
([http://Samples/SQL/ServiceBroker/msgOperationContract]);
GO
定义初始存储过程
CREATEPROCEDUREdbo.usp_msgOperation_SET
@msgIdint,
@msgContentnvarchar(2000)
AS
declare@message_bodyasxml([http://Samples/SQL/ServiceBroker/msgOperationSchema]);
declare@dialogasuniqueidentifier;
--填充消息体
SET@message_body='msgOperation
msgId'+cast(@msgIdasvarchar)+'/msgId
msgContent'+@msgContent+'/msgContent
/msgOperation';
BEGINDIALOG@dialog
FROMSERVICE[http://Samples/SQL/ServiceBroker/msgOperationInitService]
TOSERVICE'http://Samples/SQL/ServiceBroker/msgOperationProcessService'
ONCONTRACT[http://Samples/SQL/ServiceBroker/msgOperationContract];
--WITHENCRYPTION=OFF,LIFETIME=3600;
--发送消息
SENDONCONVERSATION@dialog
MESSAGETYPE[http://Samples/SQL/ServiceBroker/msgOperation](@message_body);
ENDCONVERSATION@dialog;
GO
目标方
定义队列处理存储过程
CREATEPROCEDUREdbo.usp_msgOperation_CMDAS
RETURN0
GO
定义队列
(本文来源于图老师网站,更多请访问http://m.tulaoshi.com/bianchengyuyan/)CREATEQUEUEmsgOperationProcessQueue
WITH
STATUS=ON,
RETENTION=OFF,
ACTIVATION
(
STATUS=ON,
PROCEDURE_NAME=dbo.usp_msgOperation_CMD,
MAX_QUEUE_READERS=1,
EXECUTEASSELF
);
实现队列处理存储过程
ALTERPROCEDUREdbo.usp_msgOperation_CMD
AS
declare@message_bodyasxml;
declare@message_typeassysname;
declare@dialogasuniqueidentifier;
WHILE(1=1)
BEGIN
BEGINTRANSACTION
--接收下一条可用的消息
WAITFOR(
RECEIVETOP(1)--一次只处理一条消息
@message_type=message_type_name,
@message_body=message_body,
@dialog=[conversation_handle]
FROMdbo.msgOperationProcessQueue
),TIMEOUT2000
--如果没收到任何消息则跳出循环
IF(@@ROWCOUNT=0)
BEGIN
ROLLBACKTRANSACTION
BREAK;
END
--根据接收的消息类型执行不同的消息处理逻辑
IF(@message_type='http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
BEGIN
ENDCONVERSATION@dialog;
END
ELSEIF(@message_type='http://schemas.microsoft.com/SQL/ServiceBroker/Error')
BEGIN
ENDCONVERSATION@dialog;
END
ELSEIF(@message_type='http://Samples/SQL/ServiceBroker/msgOperation')
BEGIN
declare@msgIdint
declare@msgContentnvarchar(2000)
BEGINTRY
SET@msgId=@message_body.value('data(//msgId)[1]','int');
SET@msgContent=@message_body.value('data(//msgContent)[1]','nvarchar(2000)');
--此处可以处理自定义业务逻辑
ENDCONVERSATION@dialog;
ENDTRY
BEGINCATCH
ROLLBACKTRANSACTION
CONTINUE
ENDCATCH
END
COMMITTRANSACTION
END
GO
实现目标处理服务
CREATESERVICE[http://Samples/SQL/ServiceBroker/msgOperationProcessService]
ONQUEUEdbo.msgOperationProcessQueue
([http://Samples/SQL/ServiceBroker/msgOperationContract])
GO
2.实现ServiceBroker安全配置
在一台数据库服务器上的不同数据库之间的安全配置比较简单,默认情况下数据库之间是没有外部访问权限的,要实现你就需要在本地服务器上开启模拟上下文的数据库模块,即在数据库中设置 ALTER DATABASE database_name SET TRUSTWORTHY ON 来实现互相访问的目的。
这里我们需要实现一种更灵活,更安全的配置方式,那就是基于证书的安全配置。
初始方
创建拥有服务的用户
CREATEUSERmsgOperationInitServiceUserWITHOUTLOGIN;
ALTERAUTHORIZATIONON
SERVICE::[http://Samples/SQL/ServiceBroker/msgOperationInitService]
TO
msgOperationInitServiceUser;
创建与该用户关联的私钥证书
CREATECERTIFICATEmsgOperactionInitServiceCertPrivAUTHORIZATIONmsgOperationInitServiceUser
WITHSUBJECT='ForMsgOperactionInitService',
START_DATE='01/01/2009',
EXPIRY_DATE='01/01/2100';
将公钥证书备份到文件以供目标方服务使用
BACKUPCERTIFICATEmsgOperactionInitServiceCertPriv
TOFILE='X:**msgOperactionInitServiceCertPub.cer';
创建调用目标服务的用户
CREATEUSERmsgOperationProcessServiceUserWITHOUTLOGIN;
导入目标服务的证书并把刚才创建的用户设为所有者
CREATECERTIFICATEmsgOperactionProcessServiceCertPubAUTHORIZATIONmsgOperationProcessServiceUser
FROMFILE='X:**msgOperactionProcessServiceCertPub.cer';
建立目标服务远程服务绑定
CREATEREMOTESERVICEBINDINGToMsgOperactionProcessService
TOSERVICE'http://Samples/SQL/ServiceBroker/msgOperationProcessService'
WITHUSER=msgOperationProcessServiceUser;
目标方
创建拥有服务的用户
CREATEUSERmsgOperationProcessServiceUserWITHOUTLOGIN;
ALTERAUTHORIZATIONONSERVICE::[http://Samples/SQL/ServiceBroker/msgOperationProcessService]TOmsgOperationProcessServiceUser;
创建与该用户关联的私钥证书
CREATECERTIFICATEmsgOperactionProcessServiceCertPrivAUTHORIZATIONmsgOperationProcessServiceUser
WITHSUBJECT='ForMsgOperactionProcessService',
START_DATE='01/01/2009',
EXPIRY_DATE='01/01/2100';
将公钥证书备份到文件以供初始方服务使用
BACKUPCERTIFICATEmsgOperactionProcessServiceCertPriv
TOFILE='X:**msgOperactionProcessServiceCertPub.cer';
创建调用初始服务的用户
CREATEUSERmsgOperationInitServiceUserWITHOUTLOGIN;
导入初始服务的证书并把刚才创建的用户设为所有者
CREATECERTIFICATEmsgOperactionInitServiceCertPubAUTHORIZATIONmsgOperationInitServiceUser
FROMFILE='X:**msgOperactionInitServiceCertPub.cer';