2011-05-18 58 views
2

所以這看起來像是一個非常常見的用例,也許我在想它,但是我在保持多線程集中度量方面存在問題。假設我有多個工作線程的所有處理記錄,並且我想每隔1000條記錄吐出一些指標。現在我可以讓每個線程都記錄單個度量標準,但是然後獲得吞吐量數字,但我不得不手動添加它們(當然時間邊界將不準確)。這裏有一個簡單的例子:來自多個線程的指標

public class Worker implements Runnable { 

    private static int count = 0; 
    private static long processingTime = 0; 

    public void run() { 
     while (true) { 
      ...get record 
      count++; 
      long start = System.currentTimeMillis(); 
      ...do work 
      long end = System.currentTimeMillis(); 
      processingTime += (end-start); 
      if (count % 1000 == 0) { 
       ... log some metrics 
       processingTime = 0; 
       count = 0; 
      } 
     } 
    } 
} 

希望有道理。另外我知道兩個靜態變量可能是AtomicInteger和AtomicLong。 。 。但也許不是。對人們有什麼樣的想法感興趣。我曾考慮過使用原子變量和使用ReeantrantReadWriteLock--但我真的不希望指標停止處理流程(即指標應該對處理的影響非常小)。謝謝。

回答

2

卸載實際加工到另一個線程可以是一個好主意。這個想法是封裝你的數據並迅速把它交給一個處理線程,這樣可以最大限度地減少對做有意義工作的線程的影響。

存在較小的切換爭用,但成本通常比任何其他類型的同步小很多,在許多情況下它應該是一個很好的候選者。我認爲M. Jessup的解決方案與我的相近,但希望以下代碼清楚地說明了這一點。

public class Worker implements Runnable { 

    private static final Metrics metrics = new Metrics(); 

    public void run() { 
     while (true) { 
     ...get record 
     long start = System.currentTimeMillis(); 
     ...do work 
     long end = System.currentTimeMillis(); 
     // process the metric asynchronously 
     metrics.addMetric(end - start); 
    } 
    } 

    private static final class Metrics { 
    // a single "background" thread that actually handles 
    // processing 
    private final ExecutorService metricThread = 
      Executors.newSingleThreadExecutor(); 
    // data (no synchronization needed) 
    private int count = 0; 
    private long processingTime = 0; 

    public void addMetric(final long time) { 
     metricThread.execute(new Runnable() { 
      public void run() { 
       count++; 
       processingTime += time; 
       if (count % 1000 == 0) { 
       ... log some metrics 
       processingTime = 0; 
       count = 0; 
       } 
      } 
     }); 
     } 
    } 
} 
+0

+1,但我總是懷疑這樣的解決方案。因爲實際上是實現你想要的?也就是說,操作系統允許此日誌記錄的線程搶佔成本會減少其他工作線程實現的度量數量(如果number_of_working_threads> cores_on_machine,這只是一個有效問題)。在您已經獲得指標後,不要在同一個線程上執行此操作。 – 2011-05-18 15:50:48

+0

確實,這在某些條件下效果最好。我會稍微修改您的條件爲number_of_CPU_busy_threads> cores_on_machine。實際上,很多線程將處理時間的大部分花費在空閒狀態(I/O等被阻塞)。在大多數情況下,真正活動的線程數量不會超過內核數量,否則無論如何你實際上都超過了CPU容量。如果動作的本質是「序列化」(例如記錄到文件),這種模式也可以很好地工作。 – sjlee 2011-05-18 16:56:41

+0

我真的很喜歡這個答案 - 不能說它將如何影響蝙蝠的處理時間,但我的猜測是指標線程將保持非常繁忙,而不會影響運行記錄處理器。 – Gandalf 2011-05-18 19:32:52

1

如果您依賴count的狀態和processingTime的狀態同步,那麼您將不得不使用Lock。例如,如果++count % 1000 == 0爲真,那麼您希望評估此時的processingTime指標。

對於這種情況,使用ReentrantLock是有意義的。我不會使用RRWL,因爲實際上並不存在純粹讀取發生的實例。它始終是一個讀/寫設置。但你將需要鎖定所有的

count++ 
    processingTime += (end-start); 
    if (count % 1000 == 0) { 
     ... log some metrics 
     processingTime = 0; 
     count = 0; 
    } 

無論計數++是否將在該位置,您將需要鎖定這一點。 最後,如果您使用鎖定,則不需要AtomicLong和AtomicInteger。它只是增加了開銷,並沒有更多的線程安全。

+0

我相信我說的是實際的問題。 – Gandalf 2011-05-18 13:59:22

+0

當你寫道,'但也許不是'我認爲你的意思是你會採取不使用AtomicLong的併發命中:) – 2011-05-18 14:07:31

2

我會建議,如果你不想讓日誌干擾處理,你應該有一個單獨的日誌工作線程,並讓你的處理線程只是提供一些類型的值對象,可以切換。在這個例子中,我選擇了LinkedBlockingQueue,因爲它可以使用offer()來阻塞很少的時間,並且可以將阻塞推遲到另一個從隊列中拉取值的線程。您可能需要增加MetricProcessor中的邏輯以根據您的需求訂購數據等,但即使它是長時間運行的操作,它也不會讓VM線程調度程序同時重新啓動實際處理線程。

public class Worker implements Runnable { 

    public void run() { 
    while (true) { 
     ... do some stuff 
     if (count % 1000 == 0) { 
     ... log some metrics 
     if(MetricProcessor.getInstance().addMetrics(
      new Metrics(processingTime, count, ...)) { 
      processingTime = 0; 
      count = 0; 
     } else { 
      //the call would have blocked for a more significant 
      //amount of time, here the results 
      //could be abandoned or just held and attempted again 
      //as a larger data set later 
     } 
     } 
    } 
    } 
} 

public class WorkerMetrics { 
    ...some interesting data 
    public WorkerMetrics(... data){ 
    ... 
    } 
    ...getter setters etc 
} 

public class MetricProcessor implements Runnable { 
    LinkedBlockingQueue metrics = new LinkedBlockingQueue(); 
    public boolean addMetrics(WorkerMetrics m) { 
    return metrics.offer(m); //This may block, but not for a significant amount of time. 
    } 

    public void run() { 
    while(true) { 
     WorkMetrics m = metrics.take(); //wait here for something to come in 
     //the above call does all the significant blocking without 
     //interrupting the real processing 
     ...do some actual logging, aggregation, etc of the metrics 
    } 
    } 
} 
+0

LinkedBlockingQueue絕對阻止添加。你在想ConcurrentLinkedQueue嗎? drainTo也會阻塞。添加線程的情況繞過了原來的問題,不管你做什麼,你都會有多線程的序列化。無法在我看來添加其他線程。 – 2011-05-18 14:49:53

+0

是的,它提供了封鎖功能,但是如果它進行完整的指標記錄,封鎖的時間與任何可能阻擋的時間相比是微不足道的。Op做出了聲明「但我真的不希望度量標準停止處理流程」,所以我們在這裏委託讓處理流程繼續。並確保drainTo阻止,但如果這是在度量處理中,它並不重要,因爲它與主處理分離。如果你沒有另一個線程,你如何在不阻塞一個或多個處理線程的情況下序列化一段時間? – 2011-05-18 15:27:56

+0

如果爲記錄指標做了大量工作,則可以使參數在另一個線程中執行。但你應該更新你的答案,並刪除添加,提供和採取不會阻止的語句。這對OP沒有幫助,因爲它不符合事實。 – 2011-05-18 15:45:16

相關問題