2012-03-24 65 views
0

我正在處理從.csv文件中讀取項目的程序,並將它們寫入遠程數據庫。我正在嘗試多線程程序,爲此我創建了2個具有不同連接的進程線程。爲此,將.csv文件讀入緩衝讀取器,並處理緩衝讀取器的內容。但是,線程似乎繼續複製數據(將每個元組的兩個副本寫入數據庫)。緩衝閱讀器和優先級隊列一起工作?

我一直在試圖找出如何互斥在Java中的緩衝器,並且我能想出最接近的事是一個優先級隊列。

我的問題是,你可以使用一個緩衝的閱讀器來閱讀一個文件分割成線優先級隊列線? I.E.

public void readFile(Connection connection) { 
     BufferedReader bufReader = null; 
     try{ 
      bufReader = new BufferedReader(new FileReader(RECS_FILE)); 
      bufReader.readLine(); //skip header line 
      String line; 
      while((line = bufReader.readLine()) != null) { 
       //extract fields from each line of the RECS_FILE 
       Pattern pattern = Pattern.compile("\"([^\"]+)\",\"([^\"]+)\",\"([^\"]+)\",\"([^\"]+)\""); 
       Matcher matcher = pattern.matcher(line); 
       if(!matcher.matches()) { 
        System.err.println("Unexpected line in "+RECS_FILE+": \""+line+"\""); 
        continue; 
       } 
       String stockSymbol = matcher.group(1); 
       String recDateStr = matcher.group(2); 
       String direction = matcher.group(3); 
       String completeUrl = matcher.group(4); 

       //create recommendation object to populate required fields 
       // and insert it into the database 
       System.out.println("Inserting to DB!"); 
       Recommendation rec = new Recommendation(stockSymbol, recDate, direction, completeUrl); 
       rec.insertToDb(connection); 
      } 
     } catch (IOException e) { 
      System.err.println("Unable to read "+RECS_FILE); 
      e.printStackTrace(); 
     } finally { 
      if(bufReader != null) { 
       try{ 
        bufReader.close(); 
       } catch (IOException e) { 
       } 
      } 
     } 

    } 

您會看到使用緩衝讀取器讀取.csv文件。有沒有辦法在函數之外設置一個優先級隊列,以便緩衝讀取器將元組放入優先級隊列,然後每個程序線程訪問優先級隊列?

回答

1

緩衝讀取器或實際上任何讀取器或流的性質僅限於單線程使用。優先級隊列是一個完全獨立的結構,根據實際的實現,它可能會或可能不會被多個線程使用。所以簡短的回答是:不,他們是兩個完全不相關的概念。

要解決您的原始問題:您不能使用多線程流式文件訪問。理論上你可以使用RandomAccessFile,除了你的行不是固定的寬度,因此你不能在行的開頭輸入seek()而不讀取文件中到目前爲止的所有內容。此外,即使您的數據由固定記錄組成,但使用兩個不同的線程讀取文件可能也不切實際。

可以parallelise的唯一的事情就是數據庫插入,與你失去事務性,因爲你必須使用單獨的事務每個線程的明顯警告。 (如果你不這樣做,你必須同步您的數據庫操作,這再次意味着你還沒有贏得任何東西。)

這樣一個解決方案可以是從一個線程讀取線,並通過對字符串通過ExecutorService調用的處理方法。這樣可以很好地擴展,但是又有一點需要注意:增加數據庫鎖定開銷可能會使使用多線程的優勢無效。

最終的教訓可能不是過於複雜的事情:儘量簡單的方法,只找一個更復雜的解決方案,如果簡單的一個沒有工作。另一個教訓也許是多線程不能幫助I/O綁定的程序。

0

@ Biziclop的答案是(+1)點,但我想我會添加一些關於散裝數據庫插入。

如果您不知道,在大多數SQL數據庫中關閉數據庫自動提交在批量插入過程中是一個巨大的勝利。通常,在每個SQL語句之後,數據庫將其提交到磁盤存儲,該存儲更新索引並對磁盤結構進行所有更改。通過關閉此自動提交功能,數據庫只需在最後調用commit時進行這些更改。通常,你會做這樣的事情:

conn.setAutoCommit(false); 
for (Recommendation rec : toBeInsertedList) { 
    rec.insertToDb(connection); 
} 
conn.setAutoCommit(true); 

此外,如果自動提交不被你的數據庫支持,往往包裹插入在交易完成同樣的事情。

這裏有一些其他的答案,可以幫助: