2016-05-29 57 views
0

我需要從SomeObjectHandler實現handle()方法,該方法將SomeObject委託給某個外部系統(見下文)。具有正確的hashCode和equals方法的SomeObject。方法句柄(SomeObject someObject)可以從多個線程中調用(例如10)。外部系統可以同時操作不等於someObject,但是如果系統試圖同時使用相同的someObject操作,它會中斷。我需要實現這個類來防止同等處理someObject。即使一些someObject是相等的,它們都應該被處理。發送來自Java中多線程的消息

現在,我想我需要從併發庫中使用類似隊列的東西,但我不知道是哪一個。

UPD:我只需要使用標準的Java庫。如果有可能達到最大吞吐量。

回答

0

我不是100%確定,如果我完全得到你的問題,但我認爲有多種方法可以解決這個問題。

1)如前所述,您可以使用隊列插入對象,並確保外部系統以同步方式處理對象,因爲如您所說,外部系統無法同時處理相同的對象。

2)在發件人代碼本身處理。我已經多次嘗試過。這是一段代碼片段。這種方法的好處是,它只能同步相同的對象。只要確保在最終塊中處理移除部分。對不起,如果代碼不整潔。我是新的:)

Map<SomeObject,Integer> objMap=new ConcurrentHashMap<SomeObject,Integer>(); 

public void handle(SomeObject someObject) { 
synchronized(this.class) 
{ 
Integer count=objMap.get(someObject); 
if(count==null) 
{ 
    count=0; 
} 
objMap.put(someObject,++count); 
} 

synchronized(objectMap.get(someObject) 
{ 
    outerSystem.process(someObject); 

    Integer count=objMap.get(someObject); 
    if(count>1) 
{ 
    objMap.put(someObject,--count); 
} 
else 
{ 
    objectMap.remove(someObject); 
} 
} 

} 
+0

謝謝你的嘗試,像這樣的東西可能是工作。但我認爲它有更優化的解決方案。這將是很好的實現最大吞吐量。如果我們會對每個動作進行同步/鎖定,可能會大大降低吞吐量 – AskProgram

0

RxJava可以幫助這裏。它非常擅長處理特別涉及異步轉換的數據流,並且在需要時可以在封面下討論排隊(不會通過同步修飾符進行阻塞!)。爲了解決你的問題,我會做這樣的事情:

public class SomeHandler{ 

    private final OuterSystem outerSystem; 

    private final PublishSubject<SomeObject> subject; 

    public SomeHandler() { 
     subject 
      // handle calls from multiple threads (non-blocking) 
      .serialized() 
      // buffer in memory if not keeping up  
      .onBackpressureBuffer() 
      // use equals/hashCode to order messages (the queues you referred to) 
      .groupBy(x -> x) 
      .flatMap(g -> 
       g.doOnNext(x -> outerSystem.process(x)) 
       // process groups in parallel 
       .subscribeOn(Schedulers.computation())) 
      // do something if an error occurs 
      .doOnError(e -> e.printStackTrace()) 
      // start consuming data when arrives 
      .subscribe(); 
    } 

    public void handle(SomeObject someObject) { 
     subject.doOnNext(someObject); 
    } 
} 
+0

謝謝您的回答,它確實可行。但我只需要使用標準的Java庫(這是我的錯誤,我不得不寫它) – AskProgram

0

如果我正確理解你的問題,你需要保證「相等的對象」的串行執行,而「不相等的對象」可以在parallell進行處理。實現這一目標的一種方法是安排N個處理器,並根據對象的某些確定性特徵來分散工作負載。

在你的情況,如果兩個對象是相等的,它們的散列碼必須相等,所以hasCode() modulo N可用於在N個執行人,其中每個執行人只包含單線程分散負載:

public class SomeHandler { 
    static int N = ...; 
    // Each executor is an Executors.newSingleThreadScheduledExecutor() 
    Executor[N] executors = ....; 
    OuterSystem system; 

    public void handle(SomeObject so) { 
     executors[so.hashCode() % N].execute(() -> system.process(so)); 
    } 
} 
+0

好方法。 )應該是順便說一句,因爲OP說'handle'是從多個線程中調用的,所以我會把'SomeHandler'中的所有字段放在最後。 –