2013-10-15 16 views
2

我開始了我的情況下一夜之間,看看它是如何處理的事情,當我來到這個早上,我正面臨的OutOfMemoryError:無法創建使用ExecutorServie

Exception in thread "pool-535-thread-7" java.lang.OutOfMemoryError: unable to create new native thread 
    at java.lang.Thread.start0(Native Method) 
    at java.lang.Thread.start(Thread.java:691) 
    at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:943) 
    at java.util.concurrent.ThreadPoolExecutor.processWorkerExit(ThreadPoolExecutor.java:992)[info] application - Connecting to server A 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) 
    at java.lang.Thread.run(Thread.java:722) 

我的代碼的目的很新的本地線程簡單:每5分鐘,我連接到一個遠程服務器列表,發送一個請求(通過套接字),就是這樣。

這裏是我的代碼:

我 「的cron」 任務:

/** will create a new instance of ExecutorService every 5 minutes, loading all the websites in the database to check their status **/ 
/** Maybe that's where the problem is ? I need to empty (GC ?) this ExecutorService ? **/ 
Akka.system().scheduler().schedule(
    Duration.create(0, TimeUnit.MILLISECONDS), // Initial delay 0 milliseconds 
    Duration.create(5, TimeUnit.MINUTES),  // Frequency 5 minutes 
    new Runnable() { 
     public void run() { 
      // We get the list of websites to check 
      Query<Website> query = Ebean.createQuery(Website.class, "WHERE disabled = false AND removed IS NULL"); 
      query.order("created ASC"); 
      List<Website> websites = query.findList(); // Can be 1, 10, 100, 1000. In my test case, I had only 9 websites. 

      ExecutorService executor = Executors.newFixedThreadPool(NTHREDS); 
      for (Website website : websites) { 
       CheckWebsite task = new CheckWebsite(website); 
       executor.execute(task); 
      } 

      // This will make the executor accept no new threads 
      // and finish all existing threads in the queue 
      executor.shutdown(); 
     } 
    }, 
    Akka.system().dispatcher() 
); 

我CheckWebsite類:

public class CheckWebsite implements Runnable { 
    private Website website; 

    public CheckWebsite(Website website) { 
     this.website = website; 
    } 

    @Override 
    public void run() { 
     WebsiteLog log = website.checkState(); // This is where the request is made, I copy paste the code just after 
     if (log == null) { 
      Logger.error("OHOH, WebsiteLog should not be null for website.checkState() in CheckWebsite class :s"); 
      return; 
     } 

     try { 
      log.save(); 
     catch (Exception e) { 
      Logger.info ("An error occured :/"); 
      Logger.info(e.getMessage()); 
      e.printStackTrace(); 
     } 
    } 
} 

checkState()方法Website.class

public WebsiteLog checkState() { 
    // Since I use Socket and the connection can hang indefinitely, I use an other ExecutorService in order to limit the time spent 
    // The duration is defined via Connector.timeout, Which will be the next code. 

    ExecutorService executor = Executors.newFixedThreadPool(1); 

    Connector connector = new Connector(this); 
    try { 
     final long startTime = System.nanoTime(); 

     Future<String> future = executor.submit(connector); 
     String response = future.get(Connector.timeout, TimeUnit.MILLISECONDS); 

     long duration = System.nanoTime() - startTime; 

     return PlatformLog.getLastOccurence(this, response, ((int) duration/ 1000000)); 
    } 
    catch (Exception e) { 
     return PlatformLog.getLastOccurence(this, null, null); 
    } 
} 

這裏是Connector.class。我在這裏刪除了無用的部分(如Catches):

public class Connector implements Callable<String> { 
    public final static int timeout = 2500; // WE use a timeout of 2.5s, which should be enough 

    private Website website; 

    public Connector(Website website) { 
     this.website = website; 
    } 

    @Override 
    public String call() throws Exception { 
     Logger.info ("Connecting to " + website.getAddress() + ":" + website.getPort()); 
     Socket socket = new Socket(); 

     try { 
      socket.connect(new InetSocketAddress(website.getIp(), website.getPort()), (timeout - 50)); 
      BufferedReader input = new BufferedReader(new InputStreamReader(socket.getInputStream())); 
      String response = input.readLine(); 
      socket.close(); 

      return response; 
     } 
     catch (Exception e) { 
      e.printStackTrace(); 
      throw e; 
     } 
     finally { 
      // I take the precaution to close the socket here in order to avoid a memory leak 
      // But if the previous ExecutorService force the close of this thread before 
      // I can't guarantee it will be closed :/ 
      if (socket != null && !socket.isClosed()) { 
       socket.close(); 
      } 
     } 
    } 
} 

我是新來的Java多線程,所以我可能犯了很大的錯誤。我懷疑一些區域,可能是潛在的原因,但我缺乏知識的要求我請求你們的幫助:)

作爲總結,這裏的潛力方面:

  1. 創建一個新的ExecutorService每5分鐘。也許我可以重用舊的?或者完成後是否需要關閉當前的(如果是這樣,如何?)。
  2. 我創建一個ExecutorService,將創建一個ExecutorService(在checkstate()法)
  3. ,連接器類可以是(暴力)的事實由ExecutorService停止運行它,如果時間過長,造成的事實一個沒有關閉的套接字(然後是內存泄漏)?

另外,正如您所看到的,線程「pool-535-thread-7」發生異常,這意味着它很快就不會發生。

我將last_occured檢查存儲在數據庫中,並創建日誌條目(在WebsiteLog中),增量大約爲5個小時(所以,每5分鐘一次,該線程在大約60次調用後崩潰)。

更新:這裏是重新checkState方法包括關閉調用:

public PlatformLog checkState() { 
    ExecutorService executor = Executors.newFixedThreadPool(1); 

    Connector connector = new Connector(this); 
    String response = null; 
    Long duration = null; 

    try { 
     final long startTime = System.nanoTime(); 

     Future<String> future = executor.submit(connector); 
     response = future.get(Connector.timeout, TimeUnit.MILLISECONDS); 

     duration = System.nanoTime() - startTime; 
    } 
    catch (Exception e) {} 

    executor.shutdown(); 
    if (duration != null) { 
     return WebsiteLog.getLastOccurence(this, response, (duration.intValue()/ 1000000)); 
    } 
    else { 
     return WebsiteLog.getLastOccurence(this, response, null); 
    } 
} 
+0

如果您認爲自己找到了解決方法,爲何不測試它?將您的cron工作更改爲每1分鐘(或更少?),並且您可以在一個小時內達到60個電話... –

+0

這很聰明! :我會那樣做。 - 30分鐘見;)(每30秒) –

回答

6

我不知道這是唯一的問題,但你在你的checkState()方法創建ExecutorService但你不關閉了。

按照JavaDoc中Executors.newFixedThreadPool()

The threads in the pool will exist until it is explicitly shutdown.

的線程活着會導致ExecutorServicenot to be garbage collected(這將調用shutdown()代表你所以你每次這個被調用時泄漏線程。

+0

哦,是的,我忘了關閉ExecutorService,謝謝你。但關於連接器,我該如何退出?我在嘗試中做了一個回報,但是最終沒有任何結果。我是否也應該在這裏做點什麼? –

+0

@CyrilN。不,對我來說看起來很好。您要麼返回結果,要麼拋出異常,這是正確的行爲。如果不是,它不會編譯! –

+0

我更新了我的問題以顯示'checkState()'的重訪版本。你覺得這樣更好嗎?順便說一下,如果我在'Connector.class'中拋出異常,我將永遠不會去finally塊(所以我永遠不會關閉套接字)? –