2014-03-05 67 views
1

我一直在想不同的方法來實現Java中的異步處理。我想到了一些可能性,並想知道它們中的一些是否比其他一些更好,並且可能會就如何做到這一點獲得進一步的建議。想到的最常見的用例是使用以下API void sendData(Data data)或甚至void sendData(Data data, Handler handler)通過連接(例如TCP)發送數據包。這裏有一些想法,我有:如何實現異步計算?

  1. 專用數據發送環路 - 有一個didicated線程,其行爲有點像Java事件分派線程,所有其他線程調用並提交請求。這些請求存儲在一個隊列中,並定期排空該隊列,併發送所有請求。
  2. 使用後臺線程來排空隊列--Connection可以維護掛起的請求列表,並有一個後臺線程來排隊(有一些同步)
  3. 執行程序服務 - 將請求傳遞給服務並調用處理程序從後臺線程。
  4. 異步通道 - 最高層的辦法,委派到實施

是任何那些更好的使用,或者如果你心裏有任何其他的一般想法,請分享您的意見。

+1

['FutureTask'](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/FutureTask.html):「一個可取消的異步計算。」 – 2rs2ts

回答

1

你的大部分建議實際上是打扮同一件事的不同方式。

在幕後執行程序服務具有一個或多個後臺線程池,用於排空隊列。請求被提交給它。

1和2是說的兩種方式「排隊的東西做,有一個處理該線程」

所以基本上1和2是相互的兩種變體。它們也是3.的子集。

對於4.我不確定你在這裏是什麼意思?

ExecutorService設計用於完成您正在嘗試執行的任務 - 如果您有多個線程來執行任務,那麼顯然是這樣做的方法。

如果你只有一個線程,你仍然可以使用ExecutorService,但選擇不太清楚。使用一個線程和一個BlockingQueue這將是相當簡單的,所以這可能是一條可行的路。

0

我會一直使用executor服務來做併發。它們的高級別足以隱藏管理線程的複雜性並允許線程重用。您可以將任務提交給執行程序,該執行程序將充當隊列,或者運行許多使用同步隊列(如阻塞隊列)的任務來共享數據。後者可以允許更多的靈活性,例如批量隊列項目。

我也強烈推薦番石榴ListenableFuture,因爲它可能解決您在處理併發時可能遇到的很多問題。

0

您應該清楚地將異步管道設計爲具有數據/事件依賴關係的圖形。典型的圖形包括兩種節點的:

fast handler: 
    immediately invoked when an event happens and: 
    probably stores the event, 
    and/or calls another fast handler, 
    and/or submits an asynchronous task to an executor 

task: 
    runs and issues events (that is, calls fast handlers) 

所以基本上你需要開發兩個獨立的層:快速處理程序和任務執行。執行程序是通用的,可以從java.util.concurrent包中獲取。快速處理程序很大程度上取決於您的問題域,並且不存在適用於所有情況的通用庫。例如,純隊列是一個快速處理程序,它只存儲事件,因此幾乎是無用的。

如果您使用I/O,則需要使用標準I/O庫,以便它們爲您的處理程序發出I/O事件。它可以使用同步I/O的線程構建,也可以使用Selector線程或異步I/O的異步通道構建。與NIO2異步通道使用

快速處理程序例如:

class ConnectionAcceptor implements CompletionHandler<AsynchronousSocketChannel, Void>{ 
    AsynchronousServerSocketChannel assc; 
    int maxConn;// max number of simultaneous connections 
    int connCount=0; 

    /* called on creation to start listening incoming client connection requests */ 
    void allowAccept() { 
     assc.accept(null, this); 
    } 

    /* called by I/O layer when a client connection requested */ 
    public synchronized void completed(AsynchronousSocketChannel result, Void attachment) { 
     executor.exec(new Connection(result)); 
     connCount++; 
     if (connCount<maxConn) { 
     allowAccept(); 
     } 
    } 

    /* called by Connection when it is closed */ 
    synchronized void connClosed() { 
     if (connCount==maxConn) { 
     allowAccept(); 
     } 
     connCount--; 
    } 
} 

該處理器具有3個入口點和處理3類型的事件。相似的,處理器可以創建讀或寫(但不是兩者)。它的內部狀態不是connCount,而是包含一個指示I/O操作正在進行的布爾標誌,以及等待AsynchronousSocketChannel結束啓動操作的緩衝區隊列。