2015-06-12 35 views
1

我們通過表上的觸發器將消息放入SQL Server消息隊列中。 (當一個字段被更新時,我們建立一些XML,並在下面調用觸發器)。SQL Message Broker在發送隊列中留下消息

CREATE PROCEDURE [dbo].[up_CarePay_BrokerSendXml] 
    -- Add the parameters for the stored procedure here 
    @Data VARCHAR(MAX) 

AS 
BEGIN 

    DECLARE @InitDlgHandle UNIQUEIDENTIFIER 
    DECLARE @RequestMessage VARCHAR(1000) 
    BEGIN TRY 
      BEGIN TRAN 

       BEGIN DIALOG CONVERSATION @InitDlgHandle 
       FROM SERVICE [//IcmsCarePay/Service/Initiator] 
       TO SERVICE N'//IcmsCarePay/Service/Target' 
       ON CONTRACT [//IcmsCarePay/Contract] 
       WITH ENCRYPTION = OFF; 

       SEND ON CONVERSATION @InitDlgHandle 
       MESSAGE TYPE [//IcmsCarePay/Message/Request] (@Data); 

      COMMIT TRAN; 
    END TRY 
    BEGIN CATCH 
      ROLLBACK TRAN; 
      DECLARE @Message VARCHAR(MAX); 
      SELECT @Message = ERROR_MESSAGE(); 
      PRINT @Message 
    END CATCH; 

END 

This works。消息放入隊列中。

然後將消息發送到同一服務器上的接收隊列 - 不同的數據庫。然後,我們每分鐘運行一次proc,從目標隊列中抓取消息,並將其處理到臨時表中進行處理。該消息然後不在目標隊列中,並且這一切都沒有錯誤地工作。

但是......

當我檢查initiaitor隊列,當消息來自,它填補了消息。

SELECT TOP 1000 *, casted_message_body = 
CASE message_type_name WHEN 'X' 
    THEN CAST(message_body AS NVARCHAR(MAX)) 
    ELSE message_body 
END 
FROM [ICMS].[dbo].[IcmsCarePayInitiatorQueue] WITH(NOLOCK) 

我曾經想過,當消息從發起者到目標,發起者會消失。但它似乎正在填補。

我注意到,發起者中的消息具有2的'message_type_id','E'的'驗證'以及消息正文和被轉換的消息正文爲NULL。這裏都有一個message_type_name'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'。

在目標數據庫端,這裏所採用的程序來從隊列中的消息:

CREATE PROCEDURE [dbo].[up_CarePayBrokerReceiveXml] 
AS 
BEGIN 
    SET NOCOUNT ON; 

    DECLARE @XML XML, @Response XML = 'OK', @ConversationHandle UNIQUEIDENTIFIER, @message_type_name SYSNAME, @message_body VARBINARY(MAX), @source_table VARCHAR(100) 
    DECLARE @Message VARCHAR(MAX), @Line INT, @Proc VARCHAR(MAX), @Exception VARCHAR(MAX) 

    WHILE (1 = 1) 
    BEGIN 
    -- Clear variables, as they may have been populated in previous loop. 
    SET @message_type_name = NULL 
    SET @message_body = NULL 
    SET @ConversationHandle = NULL 
    SET @source_table = NULL 

    BEGIN TRY 
     BEGIN TRAN 

     WAITFOR ( -- Pop off a message at a time, and add to storage table. 
      RECEIVE TOP (1) 
       @message_type_name = message_type_name 
      , @message_body = message_body 
      , @ConversationHandle = conversation_handle 
      , @source_table = CAST([message_body] AS XML).value('(/row/@SourceTable)[1]', 'varchar(50)') 
      FROM dbo.IcmsCarePayTargetQueue 
     ), TIMEOUT 3000; 

     IF @@ROWCOUNT = 0 
     BEGIN 
      ROLLBACK -- Complete the Transaction (Rollback, as opposeed to Commit, as there is nothing to commit). 
      BREAK 
     END 

     -- Code removed for example, but the fields are saved to a staging table in the database here... 

     -- Respond to Initiator 
     SEND ON CONVERSATION @ConversationHandle MESSAGE TYPE [//IcmsCarePay/Message/Response](@Response); 
     END CONVERSATION @ConversationHandle; 

     COMMIT -- End of Transaction 

    END TRY 
    BEGIN CATCH 
     -- End the conversation 
     END CONVERSATION @ConversationHandle WITH CLEANUP 

     -- Get details about the issue. 
     SELECT @Exception = ERROR_MESSAGE(), @Line = ERROR_LINE(), @Proc = ERROR_PROCEDURE(), @Message = 'proc: ' + @Proc + '; line: ' + CAST(@Line AS VARCHAR) + '; msg: ' + @Exception 
     SELECT @Message -- Displays on Concole when debugging. 

     -- Log the issue to the Application Log. 
     INSERT INTO dbo.ApplicationLog 
       (LogDate , 
       Thread , 
       Level , 
       Logger , 
       Message , 
       Exception 
      ) 
     VALUES (GETDATE() , -- LogDate - datetime 
       'None' , -- Thread - varchar(255) 
       'FATAL' , -- Level - varchar(50) 
       '____up_CarePayBrokerReceiveXml' , -- Logger - varchar(255) 
       @Message , -- Message - varchar(4000) 
       @Exception -- Exception - varchar(2000) 
      ) 
     COMMIT -- We have stored the erronous message, and popped it off the queue. Commit these changes. 
    END CATCH 
    END -- end while 

END 

爲什麼這些消息呆在那裏?消息

細節保留在啓動器隊列是:

Status: 1 
Priority: 5 
queuing_order: 395 
mess_sequence_number: 0 
service_name: //IcmsCarePay/Service/Initiator 
service_contract_name: //IcmsCarePay/Contract 
message_type_name: http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog 
message_type_id: 2 
validation: E 
message_body: NULL 
casted_message_body: NULL 

回答

3

看起來你使用這些談話一次對話。您的目標存儲過程從目標隊列中檢索消息,然後關閉其對話框,但不會在啓動器隊列上處理它。

由於dialog是一個分佈式的東西,所以爲了關閉它,它必須在發起者和目標端都被關閉。當您的目標進程在目標上發出end conversation @Handle;時,Service Broker會將您提到的類型的消息發送給啓動程序,以通知它此特定對話框是歷史記錄。

正確完成後,啓動程序激活過程將收到此消息,在其側面發出相應的end conversation,並關閉對話框。

由於您不會在啓動器端處理任何消息,因此這些系統消息會在那裏累積。這裏

2解決方案是可行的:

  1. 手柄EndDialog消息。這實際上應該在雙方都完成,因爲對話可以在任何一方關閉。
  2. 重複使用對話框,以便您不必在每次需要發送內容時都創建一個新對話框。這將節省一些重要的資源,特別是在流量足夠大的情況下。

請注意,無論您是使用持久性對話框還是一次性對話框,都應該完成#1。

編輯:這是默認的處理過程的一個例子,從我的一個項目採取:

create procedure [dbo].[ssb_Queue_DefaultProcessor] 
(
    @Handle uniqueidentifier, 
    @MessageType sysname, 
    @Body xml, 
    @ProcId int 
) with execute as owner as 

set nocount, ansi_nulls, ansi_padding, ansi_warnings, concat_null_yields_null, quoted_identifier, arithabort on; 
set numeric_roundabort, xact_abort, implicit_transactions off; 

declare @Error int, @ErrorMessage nvarchar(2048); 

declare @Action varchar(20); 

begin try 

-- System stuff 
if @MessageType in (
    N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog', 
    N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' 
    ) begin 

    -- Depending on the actual message, action type will be different 
    if @MessageType = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' begin 
     set @Action = 'PURGE'; 
    end else if @MessageType = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' 
     set @Action = 'CLOSE'; 

    -- Close the dialog 
    exec dbo.ssb_DialogPools_Maintain @Action = @Action, @DialogHandle = @Handle, @Error = @Error output, @ErrorMessage = @ErrorMessage output; 

    if nullif(@Error, 0) is not null 
     throw 50000, @ErrorMessage, 1; 

end else 
    -- Some unknown messages may end up here, log them 
    throw 50011, 'Unknown message type has been passed into default processor.', 1; 

end try 
begin catch 

if nullif(@Error, 0) is null 
    select @Error = error_number(), @ErrorMessage = error_message(); 

-- Don't try to resend messages from default processing 
exec dbo.ssb_Poison_Log @ErrorNumber = @Error, @ErrorMessage = @ErrorMessage, @MessageType = @MessageType, @MessageBody = @Body, @ProcId = @ProcId; 

end catch; 
return; 

它是從所有激活特效,當他們遇到比其他任何類型的消息稱爲了他們應該處理。 下面是這種活化過程中的一個例子:

create procedure [dbo].[ssb_QProcessor_Clients] 
with execute as owner as 


set nocount, ansi_nulls, ansi_padding, ansi_warnings, concat_null_yields_null, quoted_identifier, arithabort on; 
set numeric_roundabort, xact_abort, implicit_transactions off; 

declare @Handle uniqueidentifier, @MessageType sysname, @Body xml, @MessageTypeId int; 
declare @Error int, @ErrorMessage nvarchar(2048), @ProcId int = @@procid; 
declare @TS datetime2(4), @Diff int, @Delay datetime; 


-- Fast entry check for queue contents 
if not exists (select 0 from dbo.ssb_OY_Clients with (nolock)) 
    return; 

while exists (select 0 from sys.service_queues where name = 'ssb_OY_Clients' and is_receive_enabled = 1) begin 

    begin try 
    begin tran; 

    -- Receive something, if any 
    waitfor (
     receive top (1) @Handle = conversation_handle, 
      @MessageType = message_type_name, 
      @Body = message_body 
     from dbo.ssb_OY_Clients 
    ), timeout 3000; 

    if @Handle is null begin 

     -- Empty, get out 
     rollback; 
     break; 

    end; 

    -- Check for allowed message type 
    select @MessageTypeId = mt.Id 
    from dbo.ExportMessageTypes mt 
     inner join dbo.ExportSystems xs on xs.Id = mt.ExportSystemId 
    where mt.MessageTypeName = @MessageType 
     and xs.Name = N'AUDIT.OY.Clients'; 

    if @MessageTypeId is not null begin 

     -- Store the data 
     exec dbo.log_Clients @MessageType = @MessageType, @Body = @Body, @Error = @Error output, @ErrorMessage = @ErrorMessage output; 

     -- Check the result 
     if nullif(@Error, 0) is not null 
      throw 50000, @ErrorMessage, 1; 

    end else 
     -- Put it into default processor 
     exec dbo.ssb_Queue_DefaultProcessor @Handle = @Handle, @MessageType = @MessageType, @Body = @Body, @ProcId = @ProcId; 

    commit; 
    end try 
    begin catch 

    if nullif(@Error, 0) is null 
     select @Error = error_number(), @ErrorMessage = error_message(); 

    -- Check commitability of the transaction 
    if xact_state() = -1 
     rollback; 
    else if xact_state() = 1 
     commit; 

    -- Try to resend the message again 
    exec dbo.[ssb_Poison_Retry] @MessageType = @MessageType, @MessageBody = @Body, @ProcId = @ProcId, @ErrorNumber = @Error, @ErrorMessage = @ErrorMessage; 

    end catch; 

    -- Reset dialog handle 
    select @Handle = null, @Error = null, @ErrorMessage = null; 

end; 

-- Done! 
return; 

。當然,這是在這個例子中多一點可能比你所需要的,但我希望一般的方法是顯而易見的。並且您需要在發起者和目標上處理EndDialogError消息類型,因爲您永遠不知道它們將出現在哪裏。

+0

Thanks @RogerWolf - 我想選擇1.我不是100%確定'Handle EndDialog messages'是什麼意思。上述程序是否需要該代碼? – Craig

+0

我修改了問題以顯示接收方的閱讀過程。 – Craig

+0

我已添加END CONVERSATION @InitDlgHandle;在啓動過程中,在SEND之後,這已經「解決」了問題,因爲在啓動隊列中不再有消息出現,並且系統按照預期工作。這是好的,因爲我沒有處理來自目標隊列的任何響應? – Craig

相關問題