2017-04-02 48 views
0

我有一個類,其中我的add方法被多個線程調用以填充clientidToTimestampHolder LinkedBlockingQueue。然後在下面的類中,我開始一個後臺線程,每30毫秒運行一次並調用processData()方法,將clientidToTimestampHolder排除到Map List,然後迭代該List以通過調用適當的方法將數據發送到不同的服務。當底層方法返回時,並行地執行for循環而不是順序排列

我可以用不同的時間戳獲得相同的userid多次,所以這就是爲什麼我使用LinkedBlockingQueue和map。

public class Handler { 
    private final ScheduledExecutorService executorService = Executors 
     .newSingleThreadScheduledExecutor(); 
    private final LinkedBlockingQueue<Map.Entry<String, Long>> clientidToTimestampHolder = 
     new LinkedBlockingQueue<>(); 

    private static class Holder { 
    private static final Handler INSTANCE = new Handler(); 
    } 

    public static Handler getInstance() { 
    return Holder.INSTANCE; 
    } 

    private Handler() { 
    executorService.scheduleAtFixedRate(new Runnable() { 
     @Override 
     public void run() { 
     processData(); 
     } 
    }, 0, 30, TimeUnit.MILLISECONDS); 
    } 

    // called by multiple threads to populate clientidToTimestampHolder map 
    public void add(final String clientid, final Long timestamp) { 
    clientidToTimestampHolder.offer(Maps.immutableEntry(clientid, timestamp)); 
    } 

    // called by single background thread every 30 milliseconds 
    public void processData() { 
    final List<Map.Entry<String, Long>> entries = new ArrayList<>(); 
    clientidToTimestampHolder.drainTo(entries); 
    for (Map.Entry<String, Long> entry : entries) { 
     String clientid = entry.getKey(); 
     long timestamp = entry.getValue(); 
     boolean isUpdated = isUpdatedClient(clientid, timestamp); 
     if (!isUpdated) { 
     updateClient(String.valueOf(clientid)); 
     } 
    } 
    } 
} 

是我上面的代碼線程安全,沒有競爭條件?由於add方法將從多個線程調用,然後我有一個後臺線程,每30毫秒運行一次,調用processData()方法,該方法從clientidToTimestampHolder提取數據LinkedBlockingQueue。

因爲現在看起來像在重負載下,我的processData()方法可能會丟失幾個通過add方法添加的條目。我在我的add方法中添加了幾個日誌,其中打印出clientid,但不知怎的,相同的clientid沒有打印出我在isUpdatedClient方法中添加的日誌,因此使它對我產生懷疑。

我與Java 7

+0

如果你想在被調用的方法中檢查異常,或者你期望返回結果,那麼使用callable,否則你可以使用runnable – MohanaPriyan

+0

@MohanaPriyan沒有返回結果,並且submit(Runnable)檢查異常的方式與提交(贖回)。所以提交(Runnable)是可以的。 –

回答

0

submit(Runnable)工作會的工作方式submit(Callable)你現在使用的一樣。 還有一些方面可能會殺死並行執行的所有優點。

首先,您使用newCachedThreadPool。這意味着,當所有池線程都忙時,會創建一個新的線程池線程。線程創建是一個昂貴的操作,並消耗大量的內存。您應該限制池中的線程數量。如果您的任務純粹是計算性的,請使用可用處理器的數量。如果任務訪問數據庫,請使用數據庫可以處理的最大併發連接數。其次,如果您的任務(其中包括isUpdatedClient()updateClient)太小,則任務創建和提交的開銷可能太高。在這種情況下,我會創建包含多個條目的任務,並在循環中處理它們。

0

我會做下列方式:

  1. 創建其中incapsulates一些數據,你需要處理類。處理的邏輯。在這種情況下,它可能類似於ClientProcessorClient字段
  2. 我不明白爲什麼您需要一些第三方執行程序實現,所以我將使用Java執行程序。但是,與另一種實現方法將類似。
  3. 通過從源碼/使用某些工廠 - >將它們讀取到執行程序來創建所需數量的ClientProcessor對象。

下面是一個簡單的例子:

public class Client { 
    private String id; 
    private String timeStamp; 

    public Client(String id, String timeStamp) { 
     this.id = id; 
     this.timeStamp = timeStamp; 
    } 

    public String getId() { 
     return id; 
    } 

    public String getTimeStamp() { 
     return timeStamp; 
    } 
} 

ClientProcessor:

public class ClientProcessor implements Runnable{ 

    private Client client; 

    public ClientProcessor(Client client) { 
     super(); 
     this.client = client; 
    } 

    @Override 
    public void run() { 

     // do anything needed with the Client field 
     System.out.println(client); 
    } 
} 

主要

import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

public class Main { 
    public Main() throws InterruptedException { 

     ExecutorService executor = Executors.newFixedThreadPool(4); 
     for (int i = 0; i < 5; i++) { 
      executor.submit(new ClientProcessor(new Client("id" + i, "timestamp_" + i))); 
     } 

     executor.shutdown(); 
     executor.awaitTermination(5, TimeUnit.SECONDS); 
    } 
} 
0

一種完全不同的方法:使用Java8流。

含義:第一返工你的代碼入手:

entries.stream(). ... 

因爲那樣的話,把它改爲「平行」 威力只需要

entries.parallelStream(). ... 

來取代它,我不是太還有很多流入;但有很好的示例網站,如here