2017-09-26 92 views
0

我的問題是,我有一個1000條記錄的數據集。我想要3個線程處理這樣的數據, 從記錄1到300的線程1,從301到600的線程2等等。一個線程可以發出請求並一次獲取50條記錄,創建一個對象並將其放入隊列中。主線程將同時從隊列中讀取數據。主線消費者和其他線程的生產者

下面是代碼,我面對的問題是,recordRead變量告訴線程應該從哪裏開始讀取記錄的起點。 但我怎麼能爲每個線程設置不同的值,例如對於thread1,它應該是0,recordsToRead應該是300,對於thread2,recordRead應該是300,recordsToRead是300 + 300 = 600,最後一個線程應該是600,結束。 pagesize = 50 pagesize,recordRead和recordToRead是屬於主類和主線程的所有變量。

ExecutorService service = Executors.newFixedThreadPool(nThreads); 
    while(nThreads > 0) { 
     nThreads--; 
     service.execute(new Runnable() { 

      @Override 
      public void run() { 
       // TODO Auto-generated method stub 

       do { 
        int respCode = 0; 
        int RecordsToRead = div; 
        JSONObject jsObj = new JSONObject(); 
        jsObj.put("pagesize", pageSize); 
        jsObj.put("start", recordsRead); 
        jsObj.put("searchinternalid", searchInternalId); 

        try { 
         boolean status = req.invoke(jsObj); 
         respCode = req.getResponseCode(); 

        } catch (Exception e) {   
         req.reset(); 
         e.printStackTrace(); 
         return true; 
        } 
        JSONObject jsResp = req.getResponseJson(); 
        //here jsResp will be added to ArrayBlockingQueue. 

        req.reset(); 
       }while(!isError && !isMaxLimit && recordsRead < RecordsToRead); 

      } 

     }); 
    } 

此循環將是主線程讀取隊列的代碼。 如何爲所有線程設置recordsRead和recordToread。

以及如何使主線程等待,直到一個線程插入隊列中的對象。

+0

您可以創建'Runnable'的子類作爲類ctor參數的起始位置(以及其他)。 –

回答

0

我在你看來定義了兩個問題。第一個問題是執行並行塊計算,第二個問題是從中創建一個連續的管道。讓我們從第一個問題開始。要使用預定義大小進行並行計算,最佳選擇fmpv將使用fork-join框架。不僅在性能上(偷工作真的很有效),而且由於代碼簡單。但是因爲你對我來說只限於3個線程,所以直接使用線程似乎也是有效的。只要你想要我可以這樣實現:

final int chunkSize = 300; 
    //you can also use total amount of job 
    //int totalWork = 1000 and chunk size equals totalWork/threadsNumber 
    final int threadsNumber = 3; 

    Thread[] threads = new Thread[threadsNumber]; 

    for (int ii = 0; ii < threadsNumber; ii++) { 
     final int i = ii; 

     threads[ii] = new Thread(() -> { 
      //count your variable according the volume 
      // for example you can do so 
      int chunkStart = i * chunkSize; 
      int chunkEnd = chunkStart + chunkSize; 
      for(int j = chunkStart; j < chunkEnd; j++) { 
       //object creation with necessary proprs 
       //offer to queue here 
      } 
     }); 

     threads[ii].start(); 
    } 

    //your code here 
    //take here 

    for (int ii = 0; ii < threadsNumber; ii++) { 
     try { 
     //this part is only as example 
     //you do not need it     
     //here if you want you can also w8 for completion of all threads 
      threads[ii].join(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 

現在關於消費的第二個問題。對於這個puprose,你可以使用例如ConcurrentLinkedBlockingQueue(http://www.jgroups.org/javadoc/org/jgroups/util/ConcurrentLinkedBlockingQueue.html)。在生產者線程中提供報價並使用主要的take方法。

但說實話,我仍然沒有得到你的問題的原因。你想創建連續的管道還是隻是一次計算?

另外我會建議你參加這門課程:https://www.coursera.org/learn/parallel-programming-in-java/home/welcome。 這將幫助您完全解決您的問題並提供各種解決方案。還有併發和分佈式計算課程。

相關問題