我正在寫一個程序,需要從一個非常大的文件(400K +行)中讀取行,並將每行中的數據發送到Web服務。我決定嘗試線程化,並且看到我沒有想到的一些行爲,看起來像我的BufferedReader開始重用它在給它調用readline()時已經給出的行。從同一文件/奇怪行爲讀取多個線程
我的程序由兩個類組成。一個「Main」類,它啓動線程並保存對BufferedReader的靜態引用,並具有靜態同步的「readNextLine()」方法,線程可以使用該方法基本上調用BufferedReder上的readLine()。和「Runnable」類調用readNextLine()並使用來自每個readNextLine()調用的數據進行webservice調用。我創建了BufferedReader和readNextLine()靜態,因爲除了將主類的實例傳遞到線程之外,我只能想到線程共享讀者的唯一方法,我不確定哪個更好。
大約5分鐘後,我開始在我的web服務中看到錯誤,表示它正在處理它已經處理過的一行。我能夠驗證線路確實是多次分開發送的。
有沒有人有任何想法,爲什麼BufferedReader似乎給線程它已經讀取的線?我的印象是readline()是連續的,我所需要做的就是確保對readline()的調用是同步的。
我會在下面展示一些主類代碼。 runnable本質上是一個while循環,它調用readNextLine()並處理每一行,直到沒有剩下的行。
主要類:
//showing reader and thread creation
inputStream = sftp.get(path to file);
reader = new BufferedReader(new InputStreamReader(inputStream));
ExecutorService executor = Executors.newFixedThreadPool(threads);
Collection<Future> futures = new ArrayList<Future>();
for(int i=0;i<threads;i++){
MyRunnable runnable = new MyRunnable(i);
futures.add(executor.submit(runnable));
}
LOGGER.debug("futures.get()");
for(Future f:futures){
f.get(); //use to wait until all threads are done
}
public synchronized static String readNextLine(){
String results = null;
try{
if(reader!=null){
results = reader.readLine();
}
}catch(Exception e){
LOGGER.error("Error reading from file");
}
return results;
}
我認爲你需要使用'RandomAccessFile'並讓每個線程讀取從一個不同的偏移量,儘管我會使用單個線程來讀取文件的塊和每個塊讀取,拆分多個線程來聯繫您的web服務與塊的一部分。 –
其實我只是偶然發現了這個,http://docs.oracle.com/javase/7/docs/api/java/nio/channels/AsynchronousFileChannel.html我認爲它可能會做你想要做的事情,如果Java 7是一個選項。 –