2013-07-30 63 views
0

我試圖實現的OSGi服務應該從另一個bundle等待傳入的數據,當它接收它處理數據。 我使用的是LinkedBlockingQueue,因爲我不知道我會收到多少個數據包。 我的代碼如下所示:LinkedBlockingQueue.take()似乎是閒着

public class MyClass { 

protected static LinkedBlockingQueue<JSONObject> inputQueue = new LinkedBlockingQueue<JSONObject>(); 
private ExecutorService workerpool = Executors.newFixedThreadPool(4); 

public void startMyBundle() { 
    start(); 
} 

protected void start() { 
    new Thread(new Runnable() { 
     public void run() { 
      while(true){ 
       workerpool.execute(new Runnable() { 
        public void run() { 
         try { 
          process(inputQueue.take()); 
         } catch (InterruptedException e) { 
          System.out.println("thread was interrupted."); 
         } 
        } 
       }); 
      } 
     } 
    }).start(); 
} 

public void transmitIn(JSONObject packet) { 
    try { 
     inputQueue.put(packet); 
    } catch (InterruptedException e) { 

    } 
} 

protected void process(JSONObject packet) { 
    //Some processing 
} 

當我運行此,只發送一個數據包到服務,該分組首先和處理,因爲它應該,但後來我的處理器使用其所有能力最當時我得到一個OutOfMemoryError看起來像這樣:

java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "[Timer] - Periodical Task (Bundle 46) (Bundle 46)" 

可以。這是什麼原因?

回答

1

你得到,因爲這些代碼行的內存溢出異常:

while(true){ 
    workerpool.execute(new Runnable() { 
    ... 

這個不斷旋轉創建新Runnable實例和將它們添加到線程池的任務隊列中。這些進入無限的隊列並迅速填滿內存。

我想你想要一個4線程在while (true)循環中調用inputQueue.take()

for (int i = 0; i < 4; i++) { 
    workerpool.execute(new Runnable() { 
     public void run() { 
      while (!Thread.currentThread().isInterrupted()) { 
       process(inputQueue.take()); 
      } 
     } 
    }); 
} 
// remember to shut the queue down after you've submitted the last job 
workerpool.shutdown(); 

此外,您不需要Thread將任務提交到線程池。這是一個非阻塞操作,所以可以直接由調用者完成。

0

這段代碼是罪魁禍首:

protected void start() { 
    new Thread(new Runnable() { 
     public void run() { 
      while(true){ 
       workerpool.execute(new Runnable() { 
        public void run() { 
         try { 
          process(inputQueue.take()); 
         } catch (InterruptedException e) { 
          System.out.println("thread was interrupted."); 
         } 
        } 
       }); 
      } 
     } 
    }).start(); 
} 

它所做的是創建一個後臺任務增加了Runnable無限多的ExecutorService工作隊列。這最終導致OOME。

你的意思做,我想,是:

protected void start() { 
    for (int i = 0; i < 4; ++i) { 
     workerpool.execute(new Runnable() { 
      public void run() { 
       while (true) { 
        try { 
         process(inputQueue.take()); 
        } catch (InterruptedException e) { 
         //killed, exit. 
         return; 
        } 
       } 
      } 
     }); 
    } 
} 

即在等待輸入的ExecutorService上運行4名工作人員。

0

好,有點迂腐但由於這是一個OSGi標籤問題......

  1. 清理 - 你創建一個線程,執行服務,但從來沒有清理它。一般情況下,您需要一對激活/關閉方法,並且在停用後不會遺留任何遺蹟。從凝聚力的角度來看,您喜歡將其看作是一個對象,而不需要一箇中心點來管理它。聲明式服務非常適合這種模式。
  2. 分享 - 一般而言,您希望與他人共享執行者,it is best to get an Executor from the service registry。這將允許部署者根據系統中所有軟件包的使用情況來調整線程的數量。

還有一件事,鮑里斯給了一個正確的解決方案,但它不是非常有效,因爲它總是佔用4個線程和一個無界的LinkedQueue。更糟的是,代碼像服務一樣行走,它像一種服務一樣對話,但似乎並沒有被用作服務。我認爲我們可以做得更好,因爲隊列+執行器有點翻倍,在OSGi中這應該是一項服務。

@Component 
public class JSONPackageProcessor implement TransmitIn { 
    Executor executor; 

    @Reference void setExecutor(Executor e) { this.executor = e; } 

    public void transmitIn(final JSONPacket packet) { 
    executor.execute(new Runnable() { 
     public void run() { 
     try { process(packet); } 
     catch(Throwable t) { log(packet, t); } 
     } 
    } 
    } 

    void process(JSONPacket packet) { ... } 
} 

這不需要清理,假設process(...)總是'很快'結束。在這個模型中,流程不像您對池中的(任意的?)4工作線程所做的那樣受到限制。執行程序的內部隊列用於緩衝。您可以限制此如下:關係前後

@Activate void configure(Map<String,Object> map) throws Exception { 
    if (map.containsKey("throttle")) 
    throttle = new Semaphore(map.get("throttle")); 
} 

這段代碼的好處是,大多數錯誤情況都記錄下來,併發/:

Semaphore throttle= new Semaphore(4) 

    public void transmitIn(final JSONPacket packet) throws Exception { 
    throttle.acquire(); 
    executor.execute(new Runnable() { 
     public void run() { 
     try { process(packet); } 
     catch(Throwable t) { log(packet, t); } 
     finally { throttle.release(); } 
    } 
    } 

你甚至可以通過配置管理員配置此很容易是正確的,因爲你在OSGi中獲得的保證。這段代碼實際上可以像現在這樣工作(不保證一些拼寫錯誤,實際上並沒有運行它)。