Oracle Advanced Queuing 触发器入列和异步通知消息出列
1. 官方文档:
https://docs.oracle.com/database/121/ADQUE/aq_opers.htm
2. 授权用户操作dbms_aq/dbms_aqadm 的权限
grant execute on dbms_aq to C##CS_DEV;
grant execute on dbms_aqadm to C##CS_DEV;
3. 创建payload_type
create or replace TYPE client_queue_payload_type AS OBJECT
(
client_id number(9),
method varchar2(20)
);
4. 创建并启动队列
begin
--创建队列表
dbms_aqadm.create_queue_table(queue_table => 'client_queue_table',
queue_payload_type => 'client_queue_payload_type',
multiple_consumers => false);
--创建队列
DBMS_AQADM.CREATE_QUEUE(queue_name => 'client_queue',
queue_table => 'client_queue_table');
--启动队列
DBMS_AQADM.START_QUEUE(queue_name => 'client_queue');
end;
5. 创建触发器进行入列
create or replace trigger trigger_client
after insert or update or delete on client
for each row
declare
r_enqueue_options dbms_aq.enqueue_options_t;
r_message_properties dbms_aq.message_properties_t;
v_message_handle RAW(16);
o_payload client_queue_payload_type;
v_client_id number(9);
v_method varchar2(20);
begin
if inserting then
v_client_id := :new.client_id;
v_method := 'I';
elsif deleting then
v_client_id := :old.client_id;
v_method := 'D';
elsif updating then
v_client_id := :new.client_id;
v_method := 'U';
end if;
o_payload := client_queue_payload_type(v_client_id, v_method);
dbms_aq.enqueue(queue_name => 'client_queue',
enqueue_options => r_enqueue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle);
end trigger_client;
6. 创建异步通知callback存储过程
create or replace procedure client_queue_callback_pro(context RAW,
reginfo sys.aq$_reg_info,
descr sys.aq$_descriptor,
payload RAW,
payloadl NUMBER) is
r_dequeue_options dbms_aq.dequeue_options_t;
r_message_properties dbms_aq.message_properties_t;
v_message_handle RAW(16);
o_payload client_queue_payload_type;
begin
r_dequeue_options.msgid := descr.msg_id;
r_dequeue_options.consumer_name := descr.consumer_name;
dbms_aq.dequeue(queue_name => descr.queue_name,
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle);
insert into client_log
(client_id, method, deal_date)
values
(o_payload.client_id, o_payload.method, sysdate);
end client_queue_callback_pro;
7. 注册异步通知
declare
reginfo sys.aq$_reg_info;
reg_list sys.aq$_reg_info_list;
begin
reginfo := sys.aq$_reg_info('client_queue',
DBMS_AQ.NAMESPACE_AQ,
'plsql://client_queue_callback_pro?PR=0',
HEXTORAW('FF'));
reg_list := sys.aq$_reg_info_list(reginfo);
dbms_aq.register(reg_list => reg_list, reg_count => 1);
-- dbms_aq.unregister(reg_list => reg_list, reg_count => 1);
end;
8. 测试
插入操作:
insert into client
(client_id, name, sex, age)
values
('2', 'Lily', 'F', 25);
修改操作:
update client a set a.name = 'Lily Liu' where client_id = '2';
删除操作:
delete from client a where a.client_id='2';
9. 清空消息队列
https://docs.oracle.com/database/121/ADQUE/aq_admin.htm#ADQUE0812
DECLARE
v_options sys.dbms_aqadm.aq$_purge_options_t;
BEGIN
SYS.DBMS_AQADM.STOP_QUEUE(QUEUE_NAME => 'client_queue');
dbms_aqadm.purge_queue_table('client_queue_table', NULL, v_options);
SYS.DBMS_AQADM.START_QUEUE(QUEUE_NAME => 'client_queue');
END;
/