目前,Siddhi不支持子查詢。但是,您可以嘗試類似於此的查詢來獲取所有記錄計數/匹配的記錄數;
/* Enter a unique ExecutionPlan */
@Plan:name('TestExecutionPlan')
/* define streams/tables and write queries here ... */
@Import('DEL_STREAM:1.0.0')
define stream DEL (id int);
@Import('INSERT_STREAM:1.0.0')
define stream INST (EMPLOYEE_ID int, EMPLOYEE_NAME string);
@Import('CHECK_STREAM:1.0.0')
define stream CHECK (EMPLOYEE_ID int, EMPLOYEE_NAME string);
@Export('COUNTED_STREAM:1.0.0')
define stream COUNTED_STREAM (EVENT_ID string, EMPLOYEE_ID int, MATCHED_COUNT long, ALL_COUNT long);
define table EMPLOYEE_TABLE (EMPLOYEE_ID int, EMPLOYEE_NAME string);
define trigger START at 'start';
from INST
insert into EMPLOYEE_TABLE;
from DEL
delete EMPLOYEE_TABLE
on EMPLOYEE_TABLE.EMPLOYEE_ID == id;
from CHECK
select UUID() as EVENT_ID, EMPLOYEE_ID
insert into PROCESSED;
from START
select UUID() as EVENT_ID, -1 as EMPLOYEE_ID
insert into PROCESSED;
from INST
select UUID() as EVENT_ID, EMPLOYEE_ID
insert into PROCESSED;
from DEL
select UUID() as EVENT_ID, id as EMPLOYEE_ID
insert into PROCESSED;
from PROCESSED#window.time(10 sec)
select *
insert expired events into INST_EXPIRED;
-- getting all count
from PROCESSED join EMPLOYEE_TABLE
select EVENT_ID, EMPLOYEE_TABLE.EMPLOYEE_ID
insert into ALL_STREAM;
from PROCESSED#window.length(1) join ALL_STREAM
select PROCESSED.EVENT_ID, ALL_STREAM.EMPLOYEE_ID
insert into JOINED_ALL_STREAM;
from JOINED_ALL_STREAM#window.timeBatch(5 sec)
select EVENT_ID, count() as COUNT
group by EVENT_ID
insert into COUNT_ALL_STREAM;
from PROCESSED#window.length(1) join COUNT_ALL_STREAM
on COUNT_ALL_STREAM.EVENT_ID==EVENT_ID
select PROCESSED.EVENT_ID, EMPLOYEE_ID, COUNT_ALL_STREAM.COUNT
insert into COUNT_ALL_INNER_STREAM;
from every(e1=PROCESSED) -> e2=COUNT_ALL_INNER_STREAM[e1.EVENT_ID==EVENT_ID] OR e3=INST_EXPIRED[e1.EVENT_ID==EVENT_ID]
select e1.EVENT_ID, e1.EMPLOYEE_ID, e2.COUNT
insert into FILTER_ALL_COUNT;
from FILTER_ALL_COUNT[(COUNT is null)]
select EVENT_ID, EMPLOYEE_ID, 0L as COUNT
insert into ALL_COUNT;
from FILTER_ALL_COUNT[not (COUNT is null)]
select EVENT_ID, EMPLOYEE_ID, COUNT
insert into ALL_COUNT;
-- getting matched count
from PROCESSED join EMPLOYEE_TABLE
on EMPLOYEE_TABLE.EMPLOYEE_ID == EMPLOYEE_ID
select EVENT_ID, EMPLOYEE_TABLE.EMPLOYEE_ID
insert into MATCHED_STREAM;
from PROCESSED#window.length(1) join MATCHED_STREAM
select PROCESSED.EVENT_ID, MATCHED_STREAM.EMPLOYEE_ID
insert into JOINED_MATCHED_STREAM;
from JOINED_MATCHED_STREAM#window.timeBatch(5 sec)
select EVENT_ID, count() as COUNT
group by EVENT_ID
insert into COUNT_MATCHED_STREAM;
from PROCESSED#window.length(1) join COUNT_MATCHED_STREAM
on COUNT_MATCHED_STREAM.EVENT_ID==EVENT_ID
select PROCESSED.EVENT_ID, EMPLOYEE_ID, COUNT_MATCHED_STREAM.COUNT
insert into COUNT_MATCHED_INNER_STREAM;
from every(e1=PROCESSED) -> e2=COUNT_MATCHED_INNER_STREAM[e1.EVENT_ID==EVENT_ID] OR e3=INST_EXPIRED[e1.EVENT_ID==EVENT_ID]
select e1.EVENT_ID, e1.EMPLOYEE_ID, e2.COUNT
insert into FILTER_MATCHED_COUNT;
from FILTER_MATCHED_COUNT[(COUNT is null)]
select EVENT_ID, EMPLOYEE_ID, 0L as COUNT
insert into MATCHED_COUNT;
from FILTER_MATCHED_COUNT[not (COUNT is null)]
select EVENT_ID, EMPLOYEE_ID, COUNT
insert into MATCHED_COUNT;
-- joining both counts
from ALL_COUNT#window.length(1) join MATCHED_COUNT
on MATCHED_COUNT.EVENT_ID==EVENT_ID
select MATCHED_COUNT.EVENT_ID, MATCHED_COUNT.EMPLOYEE_ID, MATCHED_COUNT.COUNT as MATCHED_COUNT, ALL_COUNT.COUNT as ALL_COUNT
insert into COUNTED_STREAM;