0
任何迴應,我有這樣一個簡單的查詢:西提我沒有從彙總查詢
define stream myEventStream (userId string, price int);
define stream outputStream (avgPrice double, userId string);
@info(name = 'aQuery')
from myEventStream#window.time(5000)
select avg(price) as avgPrice, userId group by userId
insert into outputStream;
當我添加查詢回調,我不從它那裏得到任何迴應。
runtime.addCallback("aQuery", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
EventPrinter.print(timeStamp, inEvents, removeEvents);
}
});
我在另一個線程產生消息:
final AtomicInteger counter = new AtomicInteger(0);
final Random rnd = new Random(System.currentTimeMillis());
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
while (counter.getAndIncrement() < 100) {
try {
handler.send(new Object[]{"user1", rnd.nextInt(100)});
handler.send(new Object[]{"user2", rnd.nextInt(100)});
handler.send(new Object[]{"user3", rnd.nextInt(100)});
System.out.println("Sent: " + counter.get());
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
});
我期待導致每5秒。我在這裏錯過了什麼?請幫忙。 謝謝。