2009-07-03 48 views
0

我想創建一個隊列和一個回調,當一個消息排隊時觸發,但我無法獲得回調觸發。我究竟做錯了什麼?在PL/SQL中創建隊列訂閱者的語法是什麼?

我有一個排隊消息的觸發器,我可以在隊列消息表中看到它,我可以手動將它出隊並處理它,我只是無法讓回調在入隊時觸發。

BEGIN  
DBMS_AQADM.CREATE_QUEUE_TABLE (
    queue_table  => 'queue_message_table', 
    queue_payload_type => 'queue_message_type', 
    multiple_consumers => TRUE); 

DBMS_AQADM.CREATE_QUEUE (
    queue_name => 'message_queue', 
    queue_table => 'queue_message_table'); 
DBMS_AQADM.START_QUEUE (queue_name => 'message_queue'); 
END; 

CREATE OR REPLACE PROCEDURE queue_callback(
    context RAW, reginfo SYS.AQ$_REG_INFO, descr SYS.AQ$_DESCRIPTOR, payload RAW, payloadl NUMBER) AS 

    queue_options  DBMS_AQ.DEQUEUE_OPTIONS_T; 
    message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; 
    my_message   queue_message_type; 
    ret     varchar2(200); 
    message_id   RAW(16); 
BEGIN 
    DBMS_OUTPUT.PUT_LINE('Callback'); 
    queue_options.msgid := descr.msg_id; 
    queue_options.consumer_name := descr.consumer_name; 

    DBMS_AQ.DEQUEUE(
     queue_name => descr.queue_name, 
     dequeue_options => queue_options, 
     message_properties => message_properties, 
     payload => my_message, 
     msgid => message_id); 
    ret := handle_message(my_message); 
    commit; 
END; 

BEGIN 
    DBMS_AQADM.ADD_SUBSCRIBER (queue_name => 'message_queue', 
    subscriber => SYS.AQ$_AGENT('queue_subscriber', 'message_queue',NULL)); 
    DBMS_AQ.REGISTER (
    SYS.AQ$_REG_INFO_LIST(
     SYS.AQ$_REG_INFO(
     'MESSAGE_QUEUE:QUEUE_SUBSCRIBER', 
     DBMS_AQ.NAMESPACE_AQ, 
     'plsql://QUEUE_CALLBACK', 
     HEXTORAW('FF') 
    ) 
    ), 1 
); 
END; 
+0

您是否解決了這個問題?我有同樣的問題:http://stackoverflow.com/questions/30502535/oracle-advance-queue-dequeue-not-working – SaintLike 2015-05-29 13:28:46

回答

1

乍一看,似乎你既沒有開始隊列(dbms_aqadm.start_queue),你也不是什麼入列到它(dbms_aq.enqueue)。我想推薦以下demo

+0

肯定有消息使其進入隊列,我可以看到他們。回調創建語法中有些錯誤。 – 2009-07-03 18:11:42

1

您需要小心數據庫版本。已經報告了一些關於issues with Oracle Aq的錯誤。 特別是我遵循this link構建自己的示例,在Oracle 11gR2企業數據庫中執行演示。我能夠入隊,出隊,清除隊列,但用Dbms_Aq.Register創建的偵聽器不起作用。 我運行了下載Oracle 11g R2 xe數據庫的同一個例子,它工作正常。

相同的例子在Oracle 10gR2實例中運行並且完美運行。

有一些東西,你需要小心使用水性上:

  • 使用適當的參數添加用戶
  • 使用適當的命名空間Dbms_Aq.Register註冊聽衆
  • 使用多個使用者標誌聲明隊列表
  • 使用適當的權限來打包並處理隊列
  • 使用隊列的限定名稱在某些情況下,如果它沒有起作用。

' 首先創建模式

connect/as sysdba 
-- @?/rdbms/admin/dbmsaqad.sql --(install if you don't have aq installed yet) 

-- create the user and permissions 
create user aqadmin identified by aqadmin default tablespace users temporary tablespace temp; 
GRANT create session TO aqadmin; 
grant connect, resource to aqadmin; 
GRANT aq_administrator_role TO aqadmin IDENTIFIED BY aqadmin; 
GRANT execute ON dbms_aq TO aqadmin; 
GRANT execute ON dbms_aqadm TO aqadmin; 

創建DDL對象

CREATE TABLE demo_queue_message_table 
(message VARCHAR2(4000)); 

創建AQ-對象

create or replace type demo_queue_payload_type as object(message varchar2(4000)) ; 
/

begin 
    DBMS_AQADM.CREATE_QUEUE_TABLE (queue_table => 'demo_queue_table', queue_payload_type => 'demo_queue_payload_type',multiple_consumers => TRUE); 
    DBMS_AQADM.CREATE_QUEUE (queue_name => 'demo_queue', queue_table => 'demo_queue_table'); 
    DBMS_AQADM.START_QUEUE('demo_queue'); 
end; 
/

CREATE or replace PROCEDURE demo_queue_callback_procedure(
       context RAW, 
       reginfo SYS.AQ$_REG_INFO, 
       descr SYS.AQ$_DESCRIPTOR, 
       payload RAW, 
       payloadl NUMBER 
       ) AS 

    r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T; 
    r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; 
    v_message_handle  RAW(16); 
    o_payload   demo_queue_payload_type; 

BEGIN 

    r_dequeue_options.msgid := descr.msg_id; 
    r_dequeue_options.consumer_name := descr.consumer_name; 

    DBMS_AQ.DEQUEUE(
     queue_name   => descr.queue_name, 
     dequeue_options => r_dequeue_options, 
     message_properties => r_message_properties, 
     payload   => o_payload, 
     msgid    => v_message_handle 
    ); 

    INSERT INTO demo_queue_message_table (message) 
    VALUES ('Message [' || o_payload.message || '] ' || 
      'dequeued at [' || TO_CHAR(SYSTIMESTAMP, 
             'DD-MON-YYYY HH24:MI:SS.FF3') || ']'); 
    COMMIT; 

END; 
/


BEGIN 

    DBMS_AQADM.ADD_SUBSCRIBER (
     queue_name => 'demo_queue', 
     subscriber => SYS.AQ$_AGENT(
         'demo_queue_subscriber', 
         NULL, 
         NULL) 
    ); 

    DBMS_AQ.REGISTER (
     SYS.AQ$_REG_INFO_LIST(
      SYS.AQ$_REG_INFO(
      'DEMO_QUEUE:DEMO_QUEUE_SUBSCRIBER', 
      DBMS_AQ.NAMESPACE_AQ, 
      'plsql://DEMO_QUEUE_CALLBACK_PROCEDURE', 
      HEXTORAW('FF') 
      ) 
     ), 
     1 
     ); 
END; 
/

最後測試隊列

DECLARE 

    r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T; 
    r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; 
    v_message_handle  RAW(16); 
    o_payload   demo_queue_payload_type; 

BEGIN 

    o_payload := demo_queue_payload_type(
        TO_CHAR(SYSTIMESTAMP, 'DD-MON-YYYY HH24:MI:SS.FF3') 
        ); 

    DBMS_AQ.ENQUEUE(
     queue_name   => 'demo_queue', 
     enqueue_options => r_enqueue_options, 
     message_properties => r_message_properties, 
     payload   => o_payload, 
     msgid    => v_message_handle 
    ); 

    COMMIT; 

END; 
/
相關問題