2016-10-23 114 views
0

我創建了一個Grails應用程序,通過http(一次導入)從Google存儲中提取一個300MB的文本文件。該文本文件包含需要存儲到MySQL數據庫中的35 000 000個代碼。如何在Grails中保存大量數據時強制刷新數據庫?

我創建了一個Thread,它通過傳入的InputStream循環,創建一個域對象列表,將它們加載到Array中,並每隔100次迭代批量保存一次該數組。

該過程需要幾小時才能完成(這是可以的)。問題在於,在查詢表格時,我看不到保存單個記錄。它被緩存或緩存在某處,似乎正在等待該過程完成 - 這正是我不想要的!

代碼片斷

synchronized processImport (String url, String importType) throws RuntimeException { 

    InputStream stream = new URL(url).openStream(); 
    BufferedReader reader = new BufferedReader(new InputStreamReader(stream)); 

    String code; 
    int i = 0; 
    try { 
     List<ComingInHotCode> buffer = new ArrayList<>(); 
     while((code = reader.readLine()) != null) { 
      try { 
       buffer.add(new CodeDomainObject([code: code, used: false, type: importType])); 
       if (i % 100 == 0) { 
        CodeDomainObject.saveAll(buffer); 
        buffer.clear(); 
       } 

      } catch (Exception ex) { 
       println ("Save error:" + ex.getMessage()) 
      } 
      i++; 
     } 
     CodeDomainObject.saveAll(buffer); 

    } catch (Exception ex) { 
     throw ex; 
    } finally { 
     reader.close(); 
     stream.close(); 
    } 
} 

注意

  1. sessionFactory.getCurrentSession().clear()似乎並沒有做任何事情
  2. flush:true似乎並沒有做任何事情
  3. 我剛在服務中實現了一個自定義線程來完成這個任務工作。

回答

0

自己解決問題。對於那些有興趣

使用無狀態的休眠會話啓動和手動提交transactionsns:

synchronized processImport (String url, String importType) throws RuntimeException { 

    InputStream stream = new URL(url).openStream(); 
    BufferedReader reader = new BufferedReader(new InputStreamReader(stream)); 

    String code; 
    int i = 0; 

    StatelessSession session = sessionFactory.openStatelessSession(); 
    def tx = session.beginTransaction(); 

    try { 
     List<ComingInHotCode> buffer = new ArrayList<>(); 
     while((code = reader.readLine()) != null) { 

      if (i % 1000 == 0 && i > 0) { 
       // After every 1000 records commit and reopen a new. 
       tx.commit(); 
       Thread.sleep(1000); // I just added this to give the GC a chance 
       tx = session.beginTransaction(); 
      } 

      session.insert(new MyDomainObject([code: code, used: false, type: importType])); 

      i++; 
     } 

     tx.commit(); 
     Thread.sleep(1000); 

    } catch (Exception ex) { 
     throw ex; 
    } finally { 
     reader.close(); 
     stream.close(); 
     session.close(); 
    } 
} 
0

您應該使用save(flush:[true/false]),而不是那些奇怪的手工製作bufferings和TX-承諾:

new URL(url).withReader{ reader -> 
    MyDomainObject.withTransaction{ 
    int counter = 0 
    reader.eachLine{ String line -> 
     counter++ 
     new MyDomainObject(code:code).save(flush:0 == counter % 1000) 
    } 
    } 
} 

是的,如果你的代碼運行在其他線程中,它應該被包含在MyDomainObject.withTransaction{}

0

讓Grails創建並銷燬會話和交易的替代方案

synchronized processImport(String url, String importType) throws RuntimeException { 

    InputStream stream = new URL(url).openStream(); 
    BufferedReader reader = new BufferedReader(new InputStreamReader(stream)); 

    String code; 
    int i = 0; 
    try { 
     List<CodeDomainObject> buffer = [] 
     while ((code = reader.readLine()) != null) { 
      try { 
       buffer.add(new CodeDomainObject([code: code, used: false, type: importType])); 
       if (i % 1000 == 0 && i > 0) { 
        flushBuffer(buffer) 
       } 
      } catch (Exception ex) { 
       println("Save error:" + ex.getMessage()) 
      } 
      i++; 
     } 
     flushBuffer(buffer) 

    } catch (Exception ex) { 
     throw ex; 
    } finally { 
     reader.close(); 
     stream.close(); 
    } 
} 

private void flushBuffer(List<CodeDomainObject> buffer) { 
    CodeDomainObject.withNewSession { 
     CodeDomainObject.withNewTransaction { 
      CodeDomainObject.saveAll(buffer); 
      buffer.clear(); 
     } 
    } 
} 
相關問題