2013-07-23 41 views
3

我正在使用OSGI框架來製作嚴重依賴於處理數據包的應用程序。在OSGI中使並行線程運行的最佳方式是什麼使OSGI中的線程並行運行的最佳方式

每個包都處理一個包,然後將其發送到下一個包。我想要的是每個捆綁包都是平行的。所以我想讓每個bundle都運行在它自己的Thread或多個Threads中。 '問題'是OSGI並不真正支持多線程。運行在同一個JVM上的每個bundle僅運行在1個線程中,因此它遵循同步模型。

我的想法: 因此,應用程序的性質有點像生產者消費者喜歡的。 Bundle A提供了一個服務,其中包含一個用於將包發送給A的方法,我們稱之爲方法ain。 Bundle B也有類似的設置,C也是如此。它們都有aa/b/cout方法,在這種方法中它們使用下一個bundle的服務,所以在A.aout中你會像這樣調用bin:bservice.bin(package )。因此,每個bundle都是數據包的使用者和生產者,這使我認爲使用ExecutorService和BlockingQueues可能會起作用,但我不太清楚如何在bundle之間「正確」實現這一點,並且與所有他們既是消費者又是生產者我不太確定這是否是解決這個問題的最佳方式。

我希望你們能幫忙和/或有任何建議。

- 編輯 -

捆綁 AServiceImplementation

public class AServiceImplementation implements AService { 

    private BService bservice; 

    public void bindBService(BService service) { 
     bservice = service; 
     System.out.println("bundle gateway bound to b service"); 
    } 

    public void unbindBService(BService service) { 
     this.bservice = null; 
    } 

    public void process(int id) { 
     bservice.transmitIn(id);   
    } 
} 

捆B BServiceImplementation

public class BServiceImplementation implements BService { 

    private CService cservice; 

    public void bindCService(CService service) { 
     cservice = service; 
     System.out.println("bundle gateway bound to b service");; 
    } 

    public void unbindCService(CService service) { 
     this.cservice = null; 
    } 

    public void transmitIn(int id){ 
     // So if I would implement it THIS is where I would assign the data to 
     // a thread to get it processed in the process method. 
     // but doesn't that make THIS method, transmitIn a bottleneck since all packages 
     // need to pass through here? 
     process(id); 
    } 

    public void process(int id) { 
     // Lots of processing happens here 
    } 
} 

我真的不知道如何使其成爲例如包一個通過transmitIn方法將數據傳輸到捆綁B,而不傳輸是瓶頸,因爲我會讓我的「workdistribution」在該方法不同的線程(如被看見在上面的代碼中)

回答

4

'問題'是OSGI並不真正支持多線程。運行在同一個JVM上的每個bundle僅運行在1個線程中,因此它遵循同步模型。

對不起,但這是一個徹底的誤會。讓我引用OSGi Core規範,第4.2.7節:

OSGi框架正在多線程環境中運行。框架啓動後, 將啓動軟件包,這些軟件包將被激活。激活的捆綁包通常會啓動背景 線程或對來自其他捆綁包的事件作出反應。也就是說,在啓動方法返回後,框架 已轉移到ACTIVE狀態,並且許多捆綁包可能在不同的線程上忙碌。

換句話說,您可以自由創建任何適合您需求的線程解決方案。這不是一個OSGi問題。

UPDATE:

你的服務實現可以共享一個ExecutorService這樣的:

public class BServiceImplementation implements BService { 

private ExecutorService executorService; 

private CService cservice; 

public void bindCService(CService service) { 
    cservice = service; 
    System.out.println("bundle gateway bound to b service"); 
} 

public void bindExecutorService(ExecutorService executorService) { 
    this.executorService = executorService; 
} 

public void unbindCService(CService service) { 
    this.cservice = null; 
} 

public void transmitIn(final int id) { 
    executorService.submit(new Runnable() { 
     @Override 
     public void run() { 
      process(id); 
     } 
    }); 
} 

public void process(int id) { 
    // Lots of processing happens here 
} 
} 

這時正好有另一束暴露出的ThreadPoolExecutor作爲ExecutorService的。

+0

所以如果我只是保持原樣,不增加額外的多線程功能,並且假設5個不同的bundle都使用bundle A,他們都使用他們的a服務調用:servicea.startprocessingdata(包)。捆綁包A將如何迴應?捆綁包A能夠處理這些「一次全部」,還是將它們同步處理? – Spyral

+0

OSGi無法解決您的併發需求。但是您當然可以使用ExecutorService和BlockingQueues以及其他機制來構建滿足您需求的解決方案。 –

+0

通過添加一些代碼,我的問題更加清晰: – Spyral

1

我在想,如果你不能使用下面的代碼行中的激活方法:

public void activate(){ 
new Thread(this).start(); 
} 

現在你只需要讓你捆綁Runnable,並實現run()方法。在這種運行方法中,您可以讓您的消費者端軟件包等待新任務。

然後你讓每個消費者(這是通信包的服務端)定義一個BlockingQueue。當一個bundle(producer-end)綁定到一個特定的服務時,你也要確保引用這個隊列。這樣每個捆綁應該只有一個隊列。

+0

這可能是一個解決方案,但我覺得應該有一個更美好的方式來實現這一點。我確信已經有實現可以使這更容易,就像executorservice – Spyral

9

Ahum,一個完全的誤解。 OSGi並不在乎你如何處理併發,它的一大優點就是它不會像許多應用服務器那樣改變計算模型。從捆綁或包的角度來看,線程完全不相關。所以任何Java解決方案都可以在這裏工

在你的例子中,所有的處理過程都會發生在初始調用者線程,即調用AServiceImplementation的那個線程上。如果你想並行處理,你可以爲你提交的每個任務使用一個執行器,這是Executors派上用場的模式。這使得處理異步,所以你沒有返回值。注意,處理將受到您提供的Executor中可用線程的數量的限制。您還必須非常小心地編寫處理代碼,以處理非局部變量的正確同步。

另一種方法是排隊。每個服務調用都會創建一個包含任務信息的對象,然後將該對象發佈到阻塞隊列中。在OSGi激活器中,您創建一個讀取隊列並處理它的線程。在這個模型中,你可以處理而不用擔心(太多)併發,因爲所有的處理總是發生在同一個線程中。出於性能方面的考慮,你可以啓動多個線程,但處理比較困難。

但是,我的建議是跳過這種優化,並儘可能簡單地建立你的系統。如果它運行,並且你發現你有性能問題,那麼開始擔心這些問題已經足夠早了。根據您當前的理解程度,我擔心您有機會爲「可能」問題創建高度複雜的解決方案。

相關問題