1

我需要將很多實體保存到數據庫中。保存一個實體包括將行添加到不同的表中,並通過在一個表中插入一行用於將某行插入到另一個表中來自動生成鍵。這樣的邏輯使我創建和使用存儲過程。分別爲每個實體調用這個存儲過程(即通過statement.execute(...))可以正常工作,除非有數十億個實體要保存。所以我試圖分批做到這一點。但是,如果是批處理,則批處理執行會導致拋出org.postgresql.util.PSQLException,並顯示一條消息'如果沒有預期結果,則返回結果。'在PostgreSQL中批量存儲過程

我的存儲過程是這樣的:

CREATE OR REPLACE FUNCTION insertSentence(warcinfoID varchar, recordID varchar, sentence varchar, 
    sent_timestamp bigint, sect_ids smallint[]) RETURNS void AS $$ 
DECLARE 
    warcinfoIdId integer := 0; 
    recordIdId integer := 0; 
    sentId integer := 0; 
    id integer := 0; 
BEGIN 
    SELECT warcinfo_id_id INTO warcinfoIdId FROM warcinfo_id WHERE warcinfo_id_value = warcinfoID; 
    IF NOT FOUND THEN 
     INSERT INTO warcinfo_id (warcinfo_id_value) VALUES (warcinfoID) 
      RETURNING warcinfo_id_id INTO STRICT warcinfoIdId; 
    END IF; 
    SELECT record_id_id INTO recordIdId FROM record_id WHERE record_id_value = recordID; 
    IF NOT FOUND THEN 
     INSERT INTO record_id (record_id_value) VALUES (recordID) 
      RETURNING record_id_id INTO STRICT recordIdId; 
    END IF; 
    LOOP 
     SELECT sent_id INTO sentId FROM sentence_text 
      WHERE md5(sent_text) = md5(sentence) AND sent_text = sentence; 
     EXIT WHEN FOUND; 
     BEGIN 
      INSERT INTO sentence_text (sent_text) VALUES (sentence) RETURNING sent_id INTO STRICT sentId; 
     EXCEPTION WHEN unique_violation THEN 
      sentId := 0; 
     END; 
    END LOOP; 
    INSERT INTO sentence_occurrence (warcinfo_id, record_id, sent_id, timestamp, sect_ids) 
     VALUES (warcinfoIdId, recordIdId, sentId, TO_TIMESTAMP(sent_timestamp), sect_ids) 
     RETURNING entry_id INTO STRICT id; 
END; 
$$ LANGUAGE plpgsql; 

和Scala代碼是這樣的:

def partition2DB(iterator: Iterator[(String, String, String, Long, Array[Int])]): Unit = { 
    Class.forName(driver) 
    val conn = DriverManager.getConnection(connectionString) 

    try { 
    val statement = conn.createStatement() 
    var i = 0 
    iterator.foreach(r => { 
     i += 1 
     statement.addBatch(
     "select insertSentence('%s', '%s', '%s', %d, '{%s}');".format(
      r._1, r._2, r._3.replaceAll("'", "''"), r._4, r._5.mkString(",")) 
    ) 
     if (i % 1000 == 0) statement.executeBatch() 
    }) 
    if (i % 1000 != 0) statement.executeBatch() 
    } catch { 
    case e: SQLException => println("exception caught: " + e.getNextException()); 
    } finally { 
    conn.close 
    } 
} 

奇怪的是,即使statement.executeBatch()拋出一個異常,它在此之前保存的實體。所以這種解決方法,使事情的工作:

def partition2DB(iterator: Iterator[(String, String, String, Long, Array[Int])]): Unit = { 
    Class.forName(driver) 
    val conn = DriverManager.getConnection(connectionString) 

    try { 
    var statement = conn.createStatement() 
    var i = 0 
    iterator.foreach(r => { 
     i += 1 
     statement.addBatch(
     "select insertSentence('%s', '%s', '%s', %d, '{%s}');".format(
      r._1, r._2, r._3.replaceAll("'", "''"), r._4, r._5.mkString(",")) 
    ) 
     if (i % 1000 == 0) { 
     i = 0 
     try { 
      statement.executeBatch() 
     } catch { 
      case e: SQLException => statement = conn.createStatement() 
     } 
     } 
    }) 
    if (i % 1000 != 0) { 
     try { 
     statement.executeBatch() 
     } catch { 
     case e: SQLException => statement = conn.createStatement() 
     } 
    } 
    } catch { 
    case e: SQLException => println("exception caught: " + e.getNextException()); 
    } finally { 
    conn.close 
    } 
} 

不過,我希望不要輕信的PostgreSQL無證功能我目前使用。 我看到其他人也碰到這個問題來了:

有人能提出一個解決辦法?

回答

1

Strangely, even though statement.executeBatch() throw an exception, it saves entities before this.

這是因爲您沒有在事務中包裝批處理。 JDBC規範並沒有說明IIRC是否應該將事件隱式包裝在一個事務中,如果事件還沒有進行,或者作爲單獨的語句被觸發的話。錯誤發生後,實施是否應該繼續進行。

爲了獲得良好定義的行爲(和更好的性能),請將該批次包裝在一個事務中。

statement.addBatch(
    "select insertSentence('%s', '%s', '%s', %d, '{%s}');".format(
    r._1, r._2, r._3.replaceAll("'", "''"), r._4, r._5.mkString(",")) 
) 

不!遠離鍵盤!來吧,你不是一個PHP程序員:p

你知道比插入字符串到SQL更好。不要這樣做。使用PreparedStatement。除了更安全和更安全外,它還會更快,因爲PgJDBC只需發送一條語句進行解析,然後重新使用它。 PreparedStatement非常適合用於JDBC批處理。

現在,退一步有點...

Saving an entity involves adding rows to different tables with keys autogenerated by inserting a row in one table being used for inserting some row into another table. Such a logic made me create and use a stored procedure.

這是簡單的方式來寫它,但它不會奇妙演出。你在不同的表上做了很多獨立的操作,很多零散的索引更新等等。還有一些過程調用開銷,每個單獨查詢的開銷等等。pl/pgsql中的每個塊都有一個不平凡的開銷。

用這種方法你會遇到數十萬甚至上百萬行的問題,更不用說數十億了。

關係數據庫認爲集合最好。如果你真的看上十億行,基於proc的方法將無法工作。您需要批量處理原始輸入,將它們插入臨時表中,然後對臨時數據使用一系列查詢將其插入到目標表中。

如果你對PostgreSQL的9.5,你會使用INSERT ... ON CONFLICT ...您UPSERT般的操作中受益你需要熟悉INSERT INTO ... SELECT ...UPDATE ... FROM ...data-modifying common-table expressions等。

一段時間以後,這種想法會很痛苦,但這很有價值,你不會相信你在工作時獲得的表現而不是單個項目。

我無法爲你寫出所有東西 - 你沒有顯示原始數據,沒有模式,也沒有解釋細節。這很好,因爲那不是你的問題。無論如何它會太長,所以SO不是一個代碼爲我的網站。

1

好吧,我擺脫了存儲過程,以防止批次失敗,並因此在批次失敗的情況下依靠無證行爲。 現在批處理被包裝在事務中,並且Statement被PreparedStatement替換(事實上,它並沒有在這個腳本中導致更好的速度性能)。 我使用了INSERT INTO ... SELECT ...和INSERT ... ON CONFLICT ...所以很多邏輯從一個存儲過程轉移到了SQL命令。

現在看起來是這樣的:

def partition2DB(iterator: Iterator[(String, String, String, Long, Array[Short])]): Unit = { 
    val batchSize = 1000 
    val nRetries = 10 

    def updStatements(item: (String, String, String, Long, Array[Short]), c: Connection, statement1: PreparedStatement, 
        statement2: PreparedStatement, statement3: PreparedStatement, statement4: PreparedStatement) = { 
    val sentence = if (item._3.length > 2712) item._3.substring(0, 2712) else item._3 
    statement1.setString(1, item._1) 
    statement2.setString(1, item._2) 
    statement3.setString(1, sentence) 
    statement4.setString(1, item._1) 
    statement4.setString(2, item._2) 
    statement4.setString(3, sentence) 
    statement4.setString(4, sentence) 
    statement4.setLong(5, item._4) 
    statement4.setArray(6, c.createArrayOf("int4", item._5.map(new Integer(_)).asInstanceOf[Array[Object]])) 
    statement1.addBatch() 
    statement2.addBatch() 
    statement3.addBatch() 
    statement4.addBatch() 
    } 
    def executeStatements(statement1: PreparedStatement, statement2: PreparedStatement, 
         statement3: PreparedStatement, statement4: PreparedStatement) = { 
    statement1.executeBatch() 
    statement2.executeBatch() 
    statement3.executeBatch() 
    statement4.executeBatch() 
    } 

    Class.forName(driver) 
    var conn: Connection = null 

    try { 
    conn = DriverManager.getConnection(connectionString) 
    conn.setAutoCommit(false) 
    val statement1 = conn.prepareStatement("INSERT INTO warcinfo_id (warcinfo_id_value) VALUES (?) ON CONFLICT (warcinfo_id_value) DO NOTHING;") 
    val statement2 = conn.prepareStatement("INSERT INTO record_id (record_id_value) VALUES (?) ON CONFLICT (record_id_value) DO NOTHING;") 
    val statement3 = conn.prepareStatement("INSERT INTO sentence_text (sent_text) VALUES (?) ON CONFLICT (sent_text) DO NOTHING;") 
    val statement4 = conn.prepareStatement(
     """ 
     |INSERT INTO sentence_occurrence (warcinfo_id, record_id, sent_id, timestamp, sect_ids) VALUES (
     | (SELECT warcinfo_id_id FROM warcinfo_id WHERE warcinfo_id_value = ?), 
     | (SELECT record_id_id FROM record_id WHERE record_id_value = ?), 
     | (SELECT sent_id FROM sentence_text WHERE md5(sent_text) = md5(?) AND sent_text = ?), 
     | TO_TIMESTAMP(?), 
     | ? 
     |) 
     """.stripMargin) 
    var i = 0 
    val batch = ListBuffer[(String, String, String, Long, Array[Short])]() 
    conn.setAutoCommit(false) 

    def executeBatch() = { 
     var attempts = 0 
     while (attempts < nRetries) { 
     try { 
      for (item <- batch) updStatements(item, conn, statement1, statement2, statement3, statement4) 
      executeStatements(statement1, statement2, statement3, statement4) 
      conn.commit() 
      batch.clear() 
      attempts += nRetries 
     } catch { 
      case e: SQLException => { 
      attempts += 1 
      println("exception caught: " + e.getNextException) 
      conn.rollback() 
      } 
     } 
     } 
    } 

    iterator.foreach(r => { 
     i += 1 
     batch += r 
     if (i % batchSize == 0) { 
     executeBatch() 
     } 
    }) 
    if (i % batchSize != 0) { 
     executeBatch() 
    } 
    } catch { 
    case e: SQLException => println("exception caught: " + e) 
    } finally { 
    conn.close() 
    } 
} 

此代碼似乎並沒有對我雖然很整齊......

數據是相應的一些句子,其時間戳和一些標識物品的流。因此,r變量的內容如下: ('4af93233-3515-43da-8b47-71b0dad99ccc','d5ea8a14-be65-4281-9a87-24dcbdc3f879','權威指南是互聯網',1362484800 ,[1])

每個項目存儲表'sentence_occurrence',如果需要'warcinfo_id','record_id','sentence_text'。

模式是以下幾點:

statement.executeUpdate(
    """ 
    |CREATE TABLE warcinfo_id (
    | warcinfo_id_id serial PRIMARY KEY, 
    | warcinfo_id_value char(36) UNIQUE NOT NULL 
    |); 
    """.stripMargin) 
statement.executeUpdate(
    """ 
    |CREATE TABLE record_id (
    | record_id_id serial PRIMARY KEY, 
    | record_id_value char(36) UNIQUE NOT NULL 
    |); 
    """.stripMargin) 
statement.executeUpdate(
    """ 
    |CREATE TABLE sentence_text (
    | sent_id serial PRIMARY KEY, 
    | sent_text varchar UNIQUE NOT NULL 
    |); 
    """.stripMargin) 
statement.executeUpdate(
    """ 
    |CREATE TABLE sentence_occurrence (
    | entry_id serial PRIMARY KEY, 
    | warcinfo_id integer NOT NULL, 
    | record_id integer NOT NULL, 
    | sent_id integer NOT NULL, 
    | timestamp timestamp NOT NULL, 
    | sect_ids smallint ARRAY 
    |); 
    """.stripMargin) 

添加後克雷格的評論:

謝謝,克雷格。什麼是對輸入組的操作?你能發佈一個例子的鏈接嗎?

此外,我有以下問題。如果兩個批處理嘗試同時在某個表中插入相同的記錄,則會收到java.sql.BatchUpdateException,並顯示如下消息「ERROR:deadlock detected。詳細信息:進程31959在事務24298876上等待ShareLock;由進程31955阻止。 31955在交易24298877上等待ShareLock;被進程31959阻止。「什麼是這種情況下的正確解決方案?我可以考慮重試失敗的嘗試,直到它成功或達到重試次數的限制,重複存儲,然後用SELECT DISTICT ...生成最終結果表,玩隔離級別(例如嘗試「未提交讀取」) 。但是,它們都是危險的解決方法(重試次數達到極限,磁盤空間不足,數據庫中出現錯誤數據)。

+0

幹得好。如果插入操作的是多組輸入,而不是逐個調用,那麼您將獲得更大的改進,但它應該已經是一種改進。理想情況下,您可以使用PgJDBC的CopyManager加載臨時表,然後處理臨時表。 –