2012-07-03 52 views
0

我正在尋找寫一些將處理事件的併發代碼。此處理可能需要很長時間。Executor /隊列處理最後一個已知任務只

雖然該事件正在處理,它應記錄傳入的事件,然後處理最後傳入的事件,當它可以再次運行時。 (其他事件可以扔掉)。這有點像FILO隊列,但我只需要在隊列中存儲一個元素。

理想情況下,我想將我的新Executor插入到下面顯示的事件處理體系結構中。

public class AsyncNode<I, O> extends AbstractNode<I, O> { 
    private static final Logger log = LoggerFactory.getLogger(AsyncNode.class); 
    private Executor executor; 

    public AsyncNode(EventHandler<I, O> handler, Executor executor) { 
     super(handler); 
     this.executor = executor; 
    } 

    @Override 
    public void emit(O output) { 
     if (output != null) { 
      for (EventListener<O> node : children) { 
       node.handle(output); 
      } 
     } 
    } 

    @Override 
    public void handle(final I input) { 

     executor.execute(new Runnable() { 

      @Override 
      public void run() { 
       try{ 
       emit(handler.process(input)); 
       }catch (Exception e){ 
        log.error("Exception occured whilst processing input." ,e); 
        throw e; 
       } 

      } 
     }); 

    } 

}

+0

您是否希望此執行程序正常處理提交給它的其他任務?或者這個執行者只會被用來處理你的事件嗎? – shams

+0

它應該始終執行最新的已知任務,並忽略已排隊但未執行的中間任務。我提出了我的解決方案,但還沒有測試過。 –

+0

那麼這意味着執行程序一次只能主動執行一項任務,而不考慮您啓動執行程序的線程數是多少? – shams

回答

0
public class LatestTaskExecutor implements Executor { 
    private final AtomicReference<Runnable> lastTask =new AtomicReference<>(); 
    private final Executor executor; 

    public LatestTaskExecutor(Executor executor) { 
     super(); 
     this.executor = executor; 
    } 

    @Override 
    public void execute(Runnable command) { 
     lastTask.set(command); 
     executor.execute(new Runnable() { 
      @Override 
      public void run() { 
       Runnable task=lastTask.getAndSet(null); 
       if(task!=null){ 
        task.run(); 
       } 
      } 
     }); 

    } 
} 

@RunWith(MockitoJUnitRunner.class) 
public class LatestTaskExecutorTest { 

    @Mock private Executor executor; 
    private LatestTaskExecutor latestExecutor; 
    @Before 
    public void setup(){ 
     latestExecutor=new LatestTaskExecutor(executor); 
    } 
    @Test 
    public void testRunSingleTask() { 
     Runnable run=mock(Runnable.class); 
     latestExecutor.execute(run); 
     ArgumentCaptor<Runnable> captor=ArgumentCaptor.forClass(Runnable.class); 
     verify(executor).execute(captor.capture()); 
     captor.getValue().run(); 
     verify(run).run(); 
    } 

    @Test 
    public void discardsIntermediateUpdates(){ 
     Runnable run=mock(Runnable.class); 
     Runnable run2=mock(Runnable.class); 
     latestExecutor.execute(run); 
     latestExecutor.execute(run2); 
     ArgumentCaptor<Runnable> captor=ArgumentCaptor.forClass(Runnable.class); 
     verify(executor,times(2)).execute(captor.capture()); 
     for (Runnable runnable:captor.getAllValues()){ 
      runnable.run(); 
     } 
     verify(run2).run(); 
     verifyNoMoreInteractions(run); 
    } 
} 
2

我不會做任何。我想要一個AtomicReference來處理你想要處理的事件並添加一個任務來以破壞性的方式處理它。

final AtomicReference<Event> eventRef = 

public void processEvent(Event event) { 
    eventRef.set(event); 
    executor.submit(new Runnable() { 
     public vodi run() { 
      Event e = eventRef.getAndSet(null); 
      if (e == null) return; 
      // process event 
     } 
    } 
} 

這將只處理下一個事件時,執行是免費的,無需定製遺囑執行人或隊列(可用於其他事情)

這也擴展到具有鍵控事件也就是說你想處理密鑰的最後一個事件。

+0

我正在考慮圍繞這個包裝一個Executor實現,所以我可以在不需要編寫任何代碼的情況下將Executor重用於不同的場景。我需要經常爲不同的事件類型和不同的處理做到這一點。 –

+0

對於不同類型,您可以將EventType映射爲AtomicReference值。 –

+0

我想我的意思是說我想把這個邏輯與處理邏輯完全分開。 –

0

這個答案是來自DD的修改版本,它最小化了多餘任務的提交。

atomic reference用於追蹤最新事件。將自定義任務提交給隊列以處理事件,只有最先讀取最新事件的任務纔會執行,並在將原子引用清除爲空之前執行有用的工作。當其他任務有機會運行並且找不到可處理的事件時,他們只會無所事事並靜靜地消失。通過跟蹤隊列中可用任務的數量來避免提交多餘的任務。如果隊列中至少有一個待處理任務,那麼我們可以避免提交任務,因爲當已排隊的任務已經出隊時,事件將被處理。

import java.util.concurrent.Executor; 
import java.util.concurrent.atomic.AtomicInteger; 
import java.util.concurrent.atomic.AtomicReference; 

public class EventExecutorService implements Executor { 

    private final Executor executor; 
    // the field which keeps track of the latest available event to process 
    private final AtomicReference<Runnable> latestEventReference = new AtomicReference<>(); 
    private final AtomicInteger activeTaskCount = new AtomicInteger(0); 

    public EventExecutorService(final Executor executor) { 
     this.executor = executor; 
    } 

    @Override 
    public void execute(final Runnable eventTask) { 
     // update the latest event 
     latestEventReference.set(eventTask); 
     // read count _after_ updating event 
     final int activeTasks = activeTaskCount.get(); 

     if (activeTasks == 0) { 
      // there is definitely no other task to process this event, create a new task 
      final Runnable customTask = new Runnable() { 
       @Override 
       public void run() { 
        // decrement the count for available tasks _before_ reading event 
        activeTaskCount.decrementAndGet(); 
        // find the latest available event to process 
        final Runnable currentTask = latestEventReference.getAndSet(null); 
        if (currentTask != null) { 
         // if such an event exists, process it 
         currentTask.run(); 
        } else { 
         // somebody stole away the latest event. Do nothing. 
        } 
       } 
      }; 
      // increment tasks count _before_ submitting task 
      activeTaskCount.incrementAndGet(); 
      // submit the new task to the queue for processing 
      executor.execute(customTask); 
     } 
    } 
} 
+0

這是預先優化有罪嗎? –

+0

@DD。它基於任務長時間運行的信息,並且可能會跳過很多事件。 – shams

相關問題