2015-07-12 37 views
2

我在多個線程中運行我的threadPool每個線程讀取一個巨大的文件並從列表中返回此文件中的數據。線程池中的多個線程在同一列表中寫入數據

代碼如下所示:

class Writer{ 


    ArrayList finalListWhereDataWillBeWritten = new Array<Integer>() 
    for(query q : allQueries){ //all the read queries to read file 

     threadPool.submit(new GetDataFromFile(fileName,filePath));   

    }//all the read queries have been submitted. 

} 

現在我知道下面的代碼段會出現一些地方在我的代碼,但我不知道在哪裏把它。 因爲如果我將它放在for循環中的submit()之後,它不會添加它,因爲每個文件都非常大,可能尚未完成處理。

synchronized(finalListWhereDataWillBeWritten){ 

    //process the data obtained from single file and add it to target list 
     finalListWhereDataWillBeWritten.addAll(dataFromSingleThread); 

} 

因此,誰能告訴我,在哪裏我把這個代碼塊,我需要什麼其他的事情,以確保如此DONOT發生的臨界區問題。

class GetDataFromFile implements Runnable<List<Integer>>{ 

    private String fileName; 
    private String filePath; 

    public List<Integer> run(){ 
     //code for streaming the file fileName 
     return dataObtainedFromThisFile; 
    } 

} 

?我需要使用wait()/notifyAll()方法在我的代碼因爲我只在線程從文件中讀取數據平行並將它們放在一個共享列表

+0

這是一個半烤問題。請在問題結束前發佈所有相關代碼。 – CKing

+0

您還需要什麼信息?我可以提供 – veer

+0

對於初學者,您展示的兩個代碼片段之間發生了什麼。什麼是'dataFromSingleThread'?爲什麼不發佈實際的代碼片段呢? – CKing

回答

0

UPDATE請考慮由馬爾科提供答案。如果你想確保你的清單上工作之前,你的線程都完成,這是遠遠好

,請執行下列操作:

import java.util.List; 
import java.util.Vector; 

public class ThreadWork { 

    public static void main(String[] args) { 

    int count = 5; 
    Thread[] threads = new ListThread[count]; 
    List<String> masterList = new Vector<String>(); 

    for(int index = 0; index < count; index++) { 
     threads[index] = new ListThread(masterList, "Thread " + (index + 1)); 
     threads[index].start(); 
    } 
    while(isOperationRunning(threads)) { 
     // do nothing 
    } 

    System.out.println("Done!! Print Your List ..."); 

    for(String item : masterList){ 
     System.out.println("[" + item + "]"); 
    } 
    } 

    private static boolean isOperationRunning(Thread[] threads) { 
    boolean running = false; 

    for(Thread thread : threads) { 
     if(thread.isAlive()) { 
     running = true; 
     break; 
     } 
    } 
    return running; 
    } 
} 

class ListThread extends Thread { 
    private static String items[] = { "A", "B", "C", "D"}; 
    private List<String> list; 
    private String name; 

    public ListThread(List<String> masterList, String threadName) { 
    list = masterList; 
    name = threadName; 
    } 

    public void run() { 
    for(int i = 0; i < items.length;++i) { 
     randomWait(); 
     String data = "Thread [" + name + "][" + items[i] + "]"; 
     System.out.println(data); 
     list.add(data); 
    } 
    } 

    private void randomWait() { 
    try { 
     Thread.currentThread(); 
     Thread.sleep((long)(3000 * Math.random())); 
    } 
    catch (InterruptedException x) {} 
    } 
} 
+0

那麼你的代碼是好的,將做我想要的.....但你不認爲輪詢它不斷會增加執行這個程序的時間? – veer

+0

線程的要點是同時運行進程。如果我有10個文件需要處理,我可以一次處理10個線程,而不是一個。然而,有時候,只有在所有線程完成之後,您才需要對線程工作的結果採取行動。再次,如果這不是你想要的,讓我知道,我會刪除這個答案 – Constantin

+0

爲什麼你要求一次又一次刪除...可能是你的答案可以幫助未來的人。 – veer

3

您應該簡單地實施Callable<List<Integer>>而不是重新發明輪子,並將其提交給JDK的標準Executor服務。然後,隨着期貨的完成,您將結果收集到列表中。

final ExecutorService threadPool = 
    Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); 
final List<Future<List<Integer>>> futures = new ArrayList<>(); 
for(query q : allQueries) { 
    futures.add(threadPool.submit(new GetDataFromFile(fileName, filePath))); 
} 
for (Future<List<Integer>> f : futures) { 
    finalListWhereDataWillBeWritten.addAll(f.get()); 
} 

,這是所有假設你是下面的Java 8.在Java 8當然你可以使用並行流:

final List<Integer> finalListWhereDataWillBeWritten = 
    allQueries.parallelStream() 
      .flatMap(q -> getDataFromFile(q.fileName, q.filePath)) 
      .collect(toList());