0
有些情況下,我需要將我的一些消息組合在一起,以便我的工作進程可以按順序對它們進行處理。使用SSB在對話中發送和接收多條消息
但我找到的每個例子都只顯示瞭如何爲每個對話做一條消息。
我在尋找的是一個在對話中發送多條消息的例子,以及一個演示如何處理它們的例子。 (我可能會將它們都發送出去,但我似乎只能退出。)
有些情況下,我需要將我的一些消息組合在一起,以便我的工作進程可以按順序對它們進行處理。使用SSB在對話中發送和接收多條消息
但我找到的每個例子都只顯示瞭如何爲每個對話做一條消息。
我在尋找的是一個在對話中發送多條消息的例子,以及一個演示如何處理它們的例子。 (我可能會將它們都發送出去,但我似乎只能退出。)
對話提供了按順序處理消息的邊界 - 所以您需要發送組中的所有消息使用相同的ConversationId。我這樣做的方式是在創建時存儲ConversationId的實用程序表,以便每次發送消息時,都會查找適當的conversationid以發送消息。
SELECT @conversationHandle = ConversationHandle FROM Qproc.SessionConversation
WHERE
FromService = @fromService
AND ToService = @toService
AND OnContract = @onContract
AND Terminated IS NULL
IF @conversationHandle IS NULL
BEGIN
BEGIN DIALOG CONVERSATION @conversationHandle
FROM SERVICE @fromService
TO SERVICE @toService
ON CONTRACT @onContract
WITH ENCRYPTION = OFF; --, LIFETIME = 60*60*24*100;
-- Store the ongoing conversation for further use
INSERT INTO QProc.SessionConversation (FromService, ToService, OnContract,ConversationHandle)
VALUES( @fromService, @toService, @onContract, @conversationHandle)
END
-- Create the dialog timer, timeout is seconds; this will notify the ClientQueue if nothing has happened on the conversation
--in the timeout period
BEGIN CONVERSATION TIMER (@conversationHandle) TIMEOUT = 60*8;
SEND ON CONVERSATION @conversationHandle
MESSAGE TYPE [http://COMPANYNAME/AsyncTriggerRequestMesssage]
(@messageBody);
的原因,你只能看到在另一端一個消息是與會話組鎖定的事 - 你應該在這讀了上面的理解是怎麼回事,但基本上一旦消息處理程序已經看到了消息,它在消息隊列上的觀點僅限於單個對話。一旦您重複使用同一個會話ID,這不會成爲問題。下面是一個例子得到:
DECLARE @RecvReqDlgHandle UNIQUEIDENTIFIER;
DECLARE @RecvReqMsg VARCHAR(8000);
DECLARE @RecvReqMsgName sysname;
WHILE (1=1)
BEGIN
BEGIN TRANSACTION;
WAITFOR
(RECEIVE TOP(1)
@RecvReqDlgHandle = conversation_handle,
@RecvReqMsg = message_body,
@RecvReqMsgName = message_type_name
FROM QProc.AsyncTaskServiceQueue
), TIMEOUT 500;
IF @@ROWCOUNT=0
BEGIN
ROLLBACK TRANSACTION;
BREAK
END
IF @RecvReqMsgName = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
BEGIN
END CONVERSATION @RecvReqDlgHandle;
END
IF @RecvReqMsgName='http://COMPANYNAME/AsyncTriggerRequestMesssage'
BEGIN
DECLARE @BodyDoc XML;
SET @BodyDoc=CONVERT(XML, @RecvReqMsg) ;
EXEC QProc.AsyncTaskRunTask @BodyDoc;
END
COMMIT TRANSACTION;
END
最後,你需要清理這些對話了,一旦他們不再使用了,這樣的事情:
DECLARE @conversationHandle UNIQUEIDENTIFIER;
DECLARE @messageTypeName SYSNAME;
BEGIN TRANSACTION;
RECEIVE TOP(1)
@conversationHandle = conversation_handle,
@messageTypeName = message_type_name
FROM QProc.AsyncTaskClientQueue;
IF @conversationHandle IS NOT NULL
BEGIN
--If the DialogTimer message arrives, then there has been no activity on this conversation for a while (see timeout setting in [QProc].[DispatchAsyncTaskMessage])
--so we terminate gracefully and go home.
IF @messageTypeName = 'http://schemas.microsoft.com/SQL/ServiceBroker/DialogTimer'
OR @messageTypeName = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
BEGIN
END CONVERSATION @conversationHandle;
UPDATE Qproc.SessionConversation SET TERMINATED = getUtcDate() WHERE ConversationHandle = @conversationHandle;
END
END
COMMIT TRANSACTION;