2015-10-22 33 views
0

我幾個月來一直在玩Netflix的RxJava。反應式編程改變了我的整個編程方法。它確實帶來了最好的功能編程所提供的。SQLite不喜歡反應式編程?

但是,我想使用SQLite的反應式編程。 David Moten編寫了a great library來將RxJava集成到JDBC中。但SQLite似乎有一個問題。它不喜歡一個查詢推送一個ResultSet,其中每個記錄迭代被轉換爲一個對象並驅動另一個查詢。

說我有兩個表

CREATE TABLE TABLE_ONE (
    ID INTEGER PRIMARY KEY 
        NOT NULL, 
    VALUE INTEGER NOT NULL 
); 

CREATE TABLE TABLE_TWO (
    ID   INTEGER NOT NULL 
         PRIMARY KEY, 
    FOREIGN_ID INTEGER NOT NULL 
         REFERENCES TABLE_ONE ([KEY]), 
    VALUE  INTEGER NOT NULL 
); 

我創建了一個單子,做一些INSERT父/母SELECT/INSERT子/ SELECT子樣的操作。

import com.github.davidmoten.rx.jdbc.ConnectionProviderFromUrl; 
import com.github.davidmoten.rx.jdbc.Database; 
import rx.Observable; 

import java.sql.Connection; 

public final class Test { 

    public static void main(String[] args) { 

     Connection con = new ConnectionProviderFromUrl("jdbc:sqlite:C:/Users/Thomas/test.db").get(); 
     Database db = Database.from(con); 

     Observable<Integer> inputs = Observable.just(100,200,300); 

     db.update("INSERT INTO TABLE_ONE (VALUE) VALUES (?)") 
       .parameters(inputs) 
       .returnGeneratedKeys() 
       .getAs(Integer.class) 
       .flatMap(k -> db.select("SELECT * FROM TABLE_ONE WHERE ID = ?") 
           .parameter(k) 
           .get(rs -> new Type1(rs.getInt("ID"), rs.getInt("VALUE"))) 
       ).flatMap(t1 -> db.update("INSERT INTO TABLE_TWO (FOREIGN_ID,VALUE) VALUES (?,?)") 
           .parameter(t1.id) 
           .parameter(t1.value) 
           .returnGeneratedKeys() 
           .getAs(Integer.class) 
       ).flatMap(k -> db.select("SELECT * FROM TABLE_TWO WHERE ID = ?") 
           .parameter(k) 
           .get(rs -> new Type2(rs.getInt("ID"), rs.getInt("FOREIGN_ID"), rs.getInt("VALUE"))) 
       ).subscribe(System.out::println, Throwable::printStackTrace); 

     db.close(); 
    } 
    private static final class Type1 { 
     private final int id; 
     private final int value; 
     private Type1(int id, int value) { 
      this.id = id; 
      this.value = value; 
     } 
    } 
    private static final class Type2 { 
     private final int id; 
     private final int foreignId; 
     private final int value; 
     private Type2(int id, int foreignKey, int value) { 
      this.id = id; 
      this.foreignId = foreignKey; 
      this.value = value; 
     } 
     @Override 
     public String toString() { 
      return "Type2{" + 
        "id=" + id + 
        ", foreignId=" + foreignId + 
        ", value=" + value + 
        '}'; 
     } 
    } 
} 

更具體地,這是這種情況發生於所有三個數字(100,200,300)的處理...

1) INSERT a TABLE_ONE record, get its primary key ID 
2) SELECT that TABLE_ONE record with ID 
3) Turn it into a Type1 Object 
4) INSERT a TABLE_TWO record with Type1's `id for the foreign key (and get primary key ID) 
5) SELECT TABLE_TWO record with ID 
6) Turn it into a Type2 Object 

這一切發生原子級對於每100,200,300倍的值和這個鏈中每個人都會發生4次更新/查詢。

不過,我得到一個SQLITE_INTERRUPT錯誤

java.sql.SQLException: [SQLITE_INTERRUPT] Operation terminated by sqlite3_interrupt() (interrupted) 
    at org.sqlite.core.DB.newSQLException(DB.java:890) 
    at org.sqlite.core.DB.newSQLException(DB.java:901) 
    at org.sqlite.core.DB.throwex(DB.java:868) 
    at org.sqlite.jdbc3.JDBC3ResultSet.next(JDBC3ResultSet.java:93) 
    at com.github.davidmoten.rx.jdbc.QuerySelectProducer.processRow(QuerySelectProducer.java:112) 
    at com.github.davidmoten.rx.jdbc.QuerySelectProducer.requestSome(QuerySelectProducer.java:75) 

但第一項通過鏈條推動成功,插入到兩個表,並打印雖然我有兩個值(200和300)去。

Type2{id=1, foreignId=1, value=100} 

我的理論是,由於每個發射項目O從一個查詢推到下一個,它將中斷和取消以前的查詢的迭代如圖X

QUERY OP 4----------------------O- 
QUERY OP 3----------------O-----X- 
QUERY OP 2------------O---X------- 
QUERY OP 1-------O----X----------- 

因此發出的第一項目會經過,但它會在其後面留下一些中斷的查詢來驅動當前項目,因此無法讓下一個項目成爲onNext()'d,因爲查詢迭代被終止。

SQLite和RxJava的人,你們中的任何一個人都可以想辦法解決這個問題嗎?是否有可以配置的SQLite設置來阻止這種中斷?還是有一個RxJava構圖技巧可以做到防止中斷?

我還用上面的測試創建了一個簡單的Git回購。 https://github.com/thomasnield/rxjava_sqlite_test

回答

1

似乎與使用SQLite的事實有關,您不能擁有一個優秀的遊標來說一個SELECT並且同時也是COMMIT,這在默認自動提交中的INSERT和UPDATE之後是隱含的模式。這可以防止簡單地通過遊標循環遍歷SELECT所產生的行並執行INSERT。解決的辦法是簡單地獲取所有結果,然後遍歷它們。我不喜歡它。

我不知道該庫,但看了一些部分後,我確定它使用遊標懶洋洋地獲取結果,這是什麼導致的問題。你可以通過強制它獲取每個SELECT的所有結果來解決這個問題。

+0

是的,如果我用RxJava的'toList()','最後()',或在進行下一階段之前收集的所有記錄一些其他的攔截運營商,那是我在討論解決方法GitHub問題。 https://github.com/davidmoten/rxjava-jdbc/issues/45#issuecomment-150103664問題是這是反應世界中的一種反模式,並且令人不悅。我可以做這個解決方法,但希望有一個非阻塞的方式來實現這一點。 – tmn

+0

這就是我不喜歡它的原因。但修復它需要深入改變SQLite的工作方式。它需要能夠爲每個查詢保留數據庫的副本,以便SELECT可以在COMMIT取代它時繼續遍歷它。這幾乎等同於MVCC /快照隔離。 –

+0

那麼這並不理想。我想,使用超級輕量級​​技術總是有缺點,因爲我們希望他們在沒有任何開銷的情況下完成所有任務。我將發佈我的解決方案。 – tmn

1

雖然這絕對不是理想,但丹的解釋似乎證實了我所擔心的。幸運的是有辦法解決這個問題。即使用於此目的的toList()等運營商對Rx有些反模式,它也遠比不使用Rx好。

這裏我用toList().concatMap(Observable::from)來收集所有的排放物,並在每個階段之間再次將它們弄平。

import com.github.davidmoten.rx.jdbc.ConnectionProviderFromUrl; 
import com.github.davidmoten.rx.jdbc.Database; 
import rx.Observable; 

import java.sql.Connection; 

public final class Test { 

    public static void main(String[] args) { 

     Connection con = new ConnectionProviderFromUrl("jdbc:sqlite:C:/Users/Thomas/test.db").get(); 
     Database db = Database.from(con); 

     Observable<Integer> inputs = Observable.just(100,200,300); 

     db.update("INSERT INTO TABLE_ONE (VALUE) VALUES (?)") 
       .parameters(inputs) 
       .returnGeneratedKeys() 
       .getAs(Integer.class) 
       .toList().concatMap(Observable::from) 
       .concatMap(k -> db.select("SELECT * FROM TABLE_ONE WHERE ID = ?") 
           .parameter(k) 
           .get(rs -> new Type1(rs.getInt("ID"), rs.getInt("VALUE"))) 
       ) 
       .toList().concatMap(Observable::from) 
       .concatMap(t1 -> db.update("INSERT INTO TABLE_TWO (FOREIGN_ID,VALUE) VALUES (?,?)") 
           .parameter(t1.id) 
           .parameter(t1.value) 
           .returnGeneratedKeys() 
           .getAs(Integer.class) 
       ).toList().concatMap(Observable::from) 
       .concatMap(k -> db.select("SELECT * FROM TABLE_TWO WHERE ID = ?") 
           .parameter(k) 
           .get(rs -> new Type2(rs.getInt("ID"), rs.getInt("FOREIGN_ID"), rs.getInt("VALUE"))) 
       ).subscribe(System.out::println, Throwable::printStackTrace); 

     db.close(); 
    } 
    private static final class Type1 { 
     private final int id; 
     private final int value; 
     private Type1(int id, int value) { 
      this.id = id; 
      this.value = value; 
     } 
    } 
    private static final class Type2 { 
     private final int id; 
     private final int foreignId; 
     private final int value; 
     private Type2(int id, int foreignKey, int value) { 
      this.id = id; 
      this.foreignId = foreignKey; 
      this.value = value; 
     } 
     @Override 
     public String toString() { 
      return "Type2{" + 
        "id=" + id + 
        ", foreignId=" + foreignId + 
        ", value=" + value + 
        '}'; 
     } 
    } 
}