2015-06-30 45 views
-1

我有運行特定問題的算法組合的代碼,然後只要一個算法找到問題的答案,程序就會繼續。投資組合中的其他算法獲得自願信號終止,並且執行的主要線程繼續。可運行的最終變量的空指針異常

此代碼的一個用戶正在向我發送一個帶有NullPointerException的棧跟蹤 「resultReference.set(solverResult);」 從以下代碼可以看出,resultReference是一個最終變量,並且會立即初始化。我不知道它有可能變成空白。我花了很長時間試圖在我的結尾重現問題無濟於事。用戶堆棧跟蹤上的行號與我的代碼上的行號相匹配。用戶報告在3種不同的場合看到了錯誤,但很少(每次問題解決時都不會發生這種情況),所以也許會出現某種競爭狀態。這是jdk 1.8_25。

我是否正確地認爲這個錯誤應該是不可能的,因爲變量是最終的?我不確定該堆棧跟蹤應該怎麼做,並且希望有一些讓人放心的是它應該是不可能的。

public class ParallelSolver { 

private final ListeningExecutorService executorService; 
private final AtomicReference<Throwable> error; 
private final List<Solver> solvers; 
private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(ParallelSolver.class); 

public ParallelSolver(int threadPoolSize, List<Solvers> solvers) { 
    executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadPoolSize)); 
    error = new AtomicReference<>(); 
    this.solvers = solvers; 
} 

public SolverResult solve(Problem p) { 
    final AtomicReference<SolverResult> resultReference = new AtomicReference<>(); 
    final List<Future> futures = new ArrayList<>(); 
    final Semaphore workDone = new Semaphore(0); 
    try { 
     // Submit one job per each solver in the portfolio 
     solvers.forEach(solver -> { 
      final ListenableFuture<Void> future = executorService.submit(() -> { 
       SolverResult solverResult = solver.solve(p); 
       if (solverResult.isConclusive()) { 
        log.debug("Signalling the blocked thread to wake up!"); 
        // NPE HERE ON THIS LINE 
        resultReference.set(solverResult); 
        workDone.release(solvers.size()); 
       } 
       log.debug("Releasing a single permit as the work for this thread is done."); 
       workDone.release(1); 
       log.debug("Job ending..."); 
       return null; 
      }); 
      futures.add(future); 
      Futures.addCallback(future, new FutureCallback<Void>() { 
       @Override 
       public void onSuccess(Void result) { 

       } 

       @Override 
       public void onFailure(Throwable t) { 
        if (t instanceof CancellationException) { 
         return; 
        } 
        error.compareAndSet(null, t); 
        // Wake up the main thread (if it's still sleeping) 
        workDone.release(solvers.size()); 
       } 
      }); 
     }); 
     // Wait for a thread to complete solving and signal you, or all threads to timeout 
     log.debug("Main thread going to sleep"); 
     workDone.acquire(solvers.size()); 
     log.debug("Main thread waking up, checking for errors then cancelling futures"); 
     checkForErrors(); 
     // cancel any still to be launched futures 
     futures.forEach(future -> future.cancel(false)); 
     log.debug("Returning now"); 
     return resultReference.get() == null ? SolverResult.createTimeoutResult() : resultReference.get(); 
    } catch (InterruptedException e) { 
     throw new RuntimeException("Interrupted while running parallel job", e); 
    } 
} 

/** 
* We want a fail-fast policy, but java executors aren't going to throw the exception on the main thread. 
* We can't call Future.get() and check for errors, because that might block. 
* So we set a variable when an error occurs, and check it here. 
*/ 
private void checkForErrors() { 
    if (error.get() != null) { 
     log.error("Error occured while executing a task", error.get()); 
     throw new RuntimeException("Error occurred while executing a task", error.get()); 
    } 
} 
+0

寫的代碼,'solvers'永遠不會被初始化,所以它總是'null'。我不知道'workDone'是什麼。 – FDinoff

+0

謝謝,糾正 - 複製和粘貼代碼中的問題。 workDone是一個信號量,用於確保主線程阻塞,直到每個人都超時或者一個算法找到答案。 – newmanne

+1

你可以發佈完整的堆棧跟蹤嗎?一個問題是NPE是源於標記行還是源自AtomicReference代碼的某處。 –

回答

1

這裏幾乎是你想要什麼用阿卡:

import java.util.ArrayList; 
import java.util.List; 
import java.util.concurrent.TimeUnit; 

import scala.concurrent.Await; 
import scala.concurrent.Future; 
import akka.actor.ActorRef; 
import akka.actor.ActorSystem; 
import akka.actor.Props; 
import akka.actor.UntypedActor; 
import akka.pattern.Patterns; 
import akka.util.Timeout; 


public class AlgorithmTester extends UntypedActor 
{ 
    public AlgorithmTester(){} 

    public static class RegisterResultListener 
    { 

    } 

    public static class Result 
    { 
     final double result; 
     public Result(double result) 
     { 
      this.result = result; 
     } 
    } 

    public static interface Algorithmable 
    { 
     public Result solve(); 
    } 

    @SuppressWarnings("serial") 
    public static class AlgorithmsToTest extends ArrayList<Algorithmable> { 
    } 

    public static class AlgorithmRunner extends UntypedActor 
    { 

     public AlgorithmRunner(){} 

     @Override 
     public void onReceive(Object msg) throws Exception 
     { 
      if (msg instanceof Algorithmable) 
      { 
       Algorithmable alg = (Algorithmable) msg; 
       getSender().tell(alg.solve(), getSelf()); 
      } 
     } 
    } 

    List<ActorRef> runners = new ArrayList<ActorRef>(); 
    List<ActorRef> resultListeners = new ArrayList<ActorRef>(); 

    @Override 
    public void onReceive(Object msg) throws Exception 
    { 

     if (msg instanceof RegisterResultListener) 
     { 
      resultListeners.add(getSender()); 
     } 
     else if (msg instanceof AlgorithmsToTest) 
     { 
      AlgorithmsToTest algorithms = (AlgorithmsToTest) msg; 
      for (Algorithmable algorithm : algorithms) 
      { 
       ActorRef runner = getContext().actorOf(Props.create(AlgorithmRunner.class)); 
       runners.add(runner); 
       runner.tell(algorithm, getSelf()); 
      } 
      getSelf().tell(new RegisterResultListener(), getSender()); 
     } 
     else if (msg instanceof Result) 
     { 
      for (ActorRef runner : runners) 
      { 
       getContext().stop(runner); 
      } 
      runners.clear(); 

      for (ActorRef l : resultListeners) 
      { 
       l.tell(msg, getSelf()); 
      } 
     } 
    } 


    public static void main(String[] args) 
    { 
     ActorSystem system = ActorSystem.create("AlogrithmTest"); 
     ActorRef tester = system.actorOf(Props.create(AlgorithmTester.class), "algorithmTest"); 

     Algorithmable a1 = new Algorithmable() 
     { 

      public Result solve() { 
       try { 
        Thread.sleep(7000); 
       } catch (InterruptedException e) { 
        // TODO Auto-generated catch block 
        e.printStackTrace(); 
       } 
       return new Result(1100.0); 
       } 
     }; 

     Algorithmable a2 = new Algorithmable() 
     { 

      public Result solve() { 
       try { 
        Thread.sleep(6000); 
       } catch (InterruptedException e) { 
        // TODO Auto-generated catch block 
        e.printStackTrace(); 
       } 
       return new Result(330.0); 
       } 
     }; 

     Algorithmable a3 = new Algorithmable() 
     { 

      public Result solve() { 
       try { 
        Thread.sleep(1000); 
       } catch (InterruptedException e) { 
        // TODO Auto-generated catch block 
        e.printStackTrace(); 
       } 
       return new Result(1000); 
       } 
     }; 

     AlgorithmsToTest algorithmsToTest = new AlgorithmsToTest(); 
     algorithmsToTest.add(a1); 
     algorithmsToTest.add(a2); 
     algorithmsToTest.add(a3); 

     Timeout t = new Timeout(5, TimeUnit.SECONDS); 

     Future<Object> future = Patterns.ask(tester, algorithmsToTest, 100000); 
     try { 
      Result response = (Result)Await.result(future, t.duration()); 
      System.out.println(response.result); 
     } catch (Exception e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
     System.out.println("Continuing on"); 
     system.terminate(); 
     System.out.println("Terminated"); 
    } 
} 

但是存在阿卡殺演員,而他們正在處理的消息沒辦法,你會注意到,這個程序繼續而執行演員正在處理其他算法,即使已經找到第一個答案。 殺死線程永遠不會很好,所以對你的問題沒有很好的解決方案。你可以在我猜測的主要方法的末尾添加一個System.exit(0)標籤,或者在算法的某個地方有一個可怕的原子變量,它們在迭代並拋出異常,或者將它們作爲線程並殺死它們,而不是非常好): 我個人我會使用System.exit(0),如果你能擺脫它。

編輯:好感謝downvote無故。看這是你想做的替代代碼,沒有atmoic變量,原子變量和鎖,以及其他所有東西都是非常危險和容易出錯的,這是一個更清晰的答案,downvoting這是絕對廢話,所有這一切需要改變這個代碼來匹配你想要的僅僅是Result或者Algorithmable接口,並且提供你想要的所有實現,而這正是你所要求的。事實上,你只是downvote沒有評論意味着你不知道如何使用計算器。如果這個downvote推斷你認爲原子變量比基於actor的模型更受歡迎,那麼我建議你做一些閱讀。 Asker甚至沒有給出空指針異常的堆棧跟蹤,所以不可能直接解決這個問題,nextime花費0.01%的時間讓我寫這個答案在downvote之前寫評論。