我幾個月來一直在玩Netflix的RxJava。反應式編程改變了我的整個編程方法。它確實帶來了最好的功能編程所提供的。SQLite不喜歡反應式編程?
但是,我想使用SQLite的反應式編程。 David Moten編寫了a great library來將RxJava集成到JDBC中。但SQLite似乎有一個問題。它不喜歡一個查詢推送一個ResultSet
,其中每個記錄迭代被轉換爲一個對象並驅動另一個查詢。
說我有兩個表
CREATE TABLE TABLE_ONE (
ID INTEGER PRIMARY KEY
NOT NULL,
VALUE INTEGER NOT NULL
);
CREATE TABLE TABLE_TWO (
ID INTEGER NOT NULL
PRIMARY KEY,
FOREIGN_ID INTEGER NOT NULL
REFERENCES TABLE_ONE ([KEY]),
VALUE INTEGER NOT NULL
);
我創建了一個單子,做一些INSERT父/母SELECT/INSERT子/ SELECT子樣的操作。
import com.github.davidmoten.rx.jdbc.ConnectionProviderFromUrl;
import com.github.davidmoten.rx.jdbc.Database;
import rx.Observable;
import java.sql.Connection;
public final class Test {
public static void main(String[] args) {
Connection con = new ConnectionProviderFromUrl("jdbc:sqlite:C:/Users/Thomas/test.db").get();
Database db = Database.from(con);
Observable<Integer> inputs = Observable.just(100,200,300);
db.update("INSERT INTO TABLE_ONE (VALUE) VALUES (?)")
.parameters(inputs)
.returnGeneratedKeys()
.getAs(Integer.class)
.flatMap(k -> db.select("SELECT * FROM TABLE_ONE WHERE ID = ?")
.parameter(k)
.get(rs -> new Type1(rs.getInt("ID"), rs.getInt("VALUE")))
).flatMap(t1 -> db.update("INSERT INTO TABLE_TWO (FOREIGN_ID,VALUE) VALUES (?,?)")
.parameter(t1.id)
.parameter(t1.value)
.returnGeneratedKeys()
.getAs(Integer.class)
).flatMap(k -> db.select("SELECT * FROM TABLE_TWO WHERE ID = ?")
.parameter(k)
.get(rs -> new Type2(rs.getInt("ID"), rs.getInt("FOREIGN_ID"), rs.getInt("VALUE")))
).subscribe(System.out::println, Throwable::printStackTrace);
db.close();
}
private static final class Type1 {
private final int id;
private final int value;
private Type1(int id, int value) {
this.id = id;
this.value = value;
}
}
private static final class Type2 {
private final int id;
private final int foreignId;
private final int value;
private Type2(int id, int foreignKey, int value) {
this.id = id;
this.foreignId = foreignKey;
this.value = value;
}
@Override
public String toString() {
return "Type2{" +
"id=" + id +
", foreignId=" + foreignId +
", value=" + value +
'}';
}
}
}
更具體地,這是這種情況發生於所有三個數字(100,200,300)的處理...
1) INSERT a TABLE_ONE record, get its primary key ID
2) SELECT that TABLE_ONE record with ID
3) Turn it into a Type1 Object
4) INSERT a TABLE_TWO record with Type1's `id for the foreign key (and get primary key ID)
5) SELECT TABLE_TWO record with ID
6) Turn it into a Type2 Object
這一切發生原子級對於每100,200,300倍的值和這個鏈中每個人都會發生4次更新/查詢。
不過,我得到一個SQLITE_INTERRUPT
錯誤
java.sql.SQLException: [SQLITE_INTERRUPT] Operation terminated by sqlite3_interrupt() (interrupted)
at org.sqlite.core.DB.newSQLException(DB.java:890)
at org.sqlite.core.DB.newSQLException(DB.java:901)
at org.sqlite.core.DB.throwex(DB.java:868)
at org.sqlite.jdbc3.JDBC3ResultSet.next(JDBC3ResultSet.java:93)
at com.github.davidmoten.rx.jdbc.QuerySelectProducer.processRow(QuerySelectProducer.java:112)
at com.github.davidmoten.rx.jdbc.QuerySelectProducer.requestSome(QuerySelectProducer.java:75)
但第一項通過鏈條推動成功,插入到兩個表,並打印雖然我有兩個值(200和300)去。
Type2{id=1, foreignId=1, value=100}
我的理論是,由於每個發射項目O
從一個查詢推到下一個,它將中斷和取消以前的查詢的迭代如圖X
QUERY OP 4----------------------O-
QUERY OP 3----------------O-----X-
QUERY OP 2------------O---X-------
QUERY OP 1-------O----X-----------
因此發出的第一項目會經過,但它會在其後面留下一些中斷的查詢來驅動當前項目,因此無法讓下一個項目成爲onNext()
'd,因爲查詢迭代被終止。
SQLite和RxJava的人,你們中的任何一個人都可以想辦法解決這個問題嗎?是否有可以配置的SQLite設置來阻止這種中斷?還是有一個RxJava構圖技巧可以做到防止中斷?
我還用上面的測試創建了一個簡單的Git回購。 https://github.com/thomasnield/rxjava_sqlite_test
是的,如果我用RxJava的'toList()','最後()',或在進行下一階段之前收集的所有記錄一些其他的攔截運營商,那是我在討論解決方法GitHub問題。 https://github.com/davidmoten/rxjava-jdbc/issues/45#issuecomment-150103664問題是這是反應世界中的一種反模式,並且令人不悅。我可以做這個解決方法,但希望有一個非阻塞的方式來實現這一點。 – tmn
這就是我不喜歡它的原因。但修復它需要深入改變SQLite的工作方式。它需要能夠爲每個查詢保留數據庫的副本,以便SELECT可以在COMMIT取代它時繼續遍歷它。這幾乎等同於MVCC /快照隔離。 –
那麼這並不理想。我想,使用超級輕量級技術總是有缺點,因爲我們希望他們在沒有任何開銷的情況下完成所有任務。我將發佈我的解決方案。 – tmn