對於Rx專家,我有一個有趣的問題。我有一個關係表保存有關事件的信息。一個事件由id,發生的類型和時間組成。在我的代碼中,我需要在某個可能很寬的時間範圍內獲取所有事件。使用Rx批處理大型結果集
SELECT * FROM events WHERE event.time > :before AND event.time < :after ORDER BY time LIMIT :batch_size
爲了提高可靠性並處理大型結果集,我查詢批量大小的記錄:batch_size。現在,我想寫一個函數,給出:before和:after,將返回一個代表結果集的Observable。
Observable<Event> getEvents(long before, long after);
在內部,函數應該批量查詢數據庫。沿時間尺度的事件分佈是未知的。因此,解決批處理的自然方法是: 如果結果不爲空,則獲取前N條記錄 ,將最後一條記錄的時間用作新的'before'參數,並提取下N條記錄;否則在 終止 如果結果不是空的,使用最後一個記錄的時間作爲一個新的'before'參數,並獲取下N個記錄;否則終止 ......等等(這個想法應該很清楚)
我的問題是:
是否有表達更高級別的可觀測元(過濾器/圖而言,這功能的方法/ flatMap /掃描/範圍等),而不明確使用訂閱者?
到目前爲止,我沒能做到這一點,並提出了以下簡單的代碼來代替:
private void observeGetRecords(long before, long after, Subscriber<? super Event> subscriber) {
long start = before;
while (start < after) {
final List<Event> records;
try {
records = getRecordsByRange(start, after);
} catch (Exception e) {
subscriber.onError(e);
return;
}
if (records.isEmpty()) break;
records.forEach(subscriber::onNext);
start = Iterables.getLast(records).getTime();
}
subscriber.onCompleted();
}
public Observable<Event> getRecords(final long before, final long after) {
return Observable.create(subscriber -> observeGetRecords(before, after, subscriber));
}
這裏,getRecordsByRange實現使用DBI SELECT查詢並返回一個列表。此代碼工作正常,但缺乏高級Rx構造的優雅。
注意:我知道我可以通過DBI中的SELECT查詢返回Iterator。但是,我不想那樣做,而寧願運行多個查詢。這種計算不一定是原子的,所以事務隔離的問題是不相關的。
我不完全理解爲什麼你會想從時間繼續最後一批。給定tstart,傾向於,如果此範圍內的數據以tdata結尾,爲什麼要從tdata查詢?目前tdata和趨勢之間沒有任何數據可用。 – akarnokd
所以這就是爲什麼我檢查'records'列表是否爲空,並且如果是的話就從循環中斷開。 – GrumpyCat