2011-06-30 24 views
27

當您需要執行I/O(即數據庫操作)時,actor模型(在Akka中)如何工作?I/O如何在阿卡工作?

我的理解是阻塞操作會拋出一個異常(並且由於Akty使用Netty的均勻性質而導致所有併發性基本上被破壞)。因此,我將不得不使用Future或類似的東西 - 但我不明白併發模型。

  1. 1個actor是否可以同時處理多個消息?
  2. 如果一個演員在future(即future.get())中發起阻止呼叫,是否阻止當前演員的執行;還是會阻止所有參與者執行,直到阻止呼叫完成?
  3. 如果它阻止所有執行,如何使用未來的輔助併發(即不會在將來調用阻塞調用仍等於創建一個參與者並執行阻塞調用)?
  4. 什麼是處理多階段過程(即從數據庫中讀取;調用阻塞Web服務;從數據庫中讀取數據;寫入數據庫)的最佳方式,其中每個步驟都依賴於最後一個階段?

的基本背景是這樣的:

  • 我使用的WebSocket服務器,這將維持數千個會話。
  • 每個會話都有一些狀態(即驗證細節等)。
  • Javascript客戶端將發送一個JSON-RPC消息給服務器,服務器會將它傳遞給相應的會話參與者,該參與者將執行它並返回結果。
  • 執行RPC調用將涉及一些I/O並阻止調用。
  • 會有大量的併發請求(每個用戶將通過WebSocket連接發出大量請求,並且會有很多用戶)。

有沒有更好的方法來實現這一目標?

+0

我很驚訝沒有人提到使用類似於Node.js/Twisted/gevent等方法的異步IO。 –

回答

27

阻止操作不會在Akka中拋出異常。你可以阻止來自演員的電話(你可能想要最小化,但多數民衆贊成在另一個故事)。

  1. 不,1個演員實例不能。
  2. 它不會阻止任何其他演員。您可以通過使用特定的分派器來影響這一點。期貨使用默認的分派器(通常驅動的全局事件),因此它在池中的線​​程上運行。你可以選擇你想用於演員(每個演員或所有演員)的調度員。我猜如果你真的想創建一個問題,你可能能夠將完全相同的(基於線程)調度程序傳遞給期貨和演員,但這需要你的一些意圖。我想如果你有大量的期貨無限期阻止,並且executorservice已經配置爲固定數量的線程,那麼你可能會炸燬executorservice。所以很多'ifs'。 f.get只有在未來尚未完成時纔會阻止。它會阻止你從中調用它的演員的'當前線程'(如果你從一個演員調用它,這是不必要的)
  3. 你不一定要阻止。你可以使用回調而不是f.get。你甚至可以在沒有阻礙的情況下撰寫期貨。查看Viktor關於'akka的有希望的未來'的討論以獲得更多詳細信息:http://skillsmatter.com/podcast/scala/talk-by-viktor-klang
  4. 我會在步驟之間使用異步通信(如果步驟本身是有意義的流程),那麼在每個步驟中使用actor每個演員都會向下一個發送單向消息,也可能向其他演員發送單向消息,而這些消息不會阻止可以監控該過程的其他演員。通過這種方式,您可以創建演員鏈,其中可以創建許多演員,在演員面前可以放置負載平衡演員,這樣,如果一個演員在一個鏈中阻塞,另一個演員可能不在另一個鏈中。這也適用於你的「背景」問題,將工作量傳遞給本地參與者,並將其鏈接到負載均衡角色後面。

至於netty(並且我假設你是指遠程演員,因爲這是在Akka中唯一使用netty的),將你的工作儘快傳遞給本地演員或未來(與回調),如果你擔心時間或防止netty以某種方式做它的工作。

10

阻止操作通常不會拋出異常,但等待未來(例如通過使用!!!!!發送方法)可能會引發超時異常。這就是爲什麼你應該儘可能堅持儘可能多地使用fire-and-forget,使用一個有意義的超時值,並且儘可能使用回調。

  1. 的阿卡演員不能明確過程中的幾個消息在一排,但你可以通過配置文件中的throughput價值發揮。如果它的消息隊列不是空的,則演員將處理幾條消息(即,其接收方法將被多次連續調用):http://akka.io/docs/akka/1.1.3/scala/dispatchers.html#id5

  2. 參與者內部的阻止操作不會「阻止」所有參與者,但如果共享執行者之間的線程(推薦使用),調度程序的其中一個線程將被阻塞,直到操作恢復。所以儘可能地嘗試撰寫期貨,並提防超時價值)。

3和4.我同意雷蒙德的答案。

1

雷蒙德和範式說了什麼,還有,如果你想避免扼殺線程池,你應該在scala.concurrent.blocking中封裝任何阻塞操作。

當然,最好避免阻塞操作,但有時您需要使用阻塞的庫。如果你將代碼包裝在blocking中,它會讓執行環境知道你可能會阻塞這個線程,所以如果需要的話它可以分配另一個線程。

問題比範例描述更糟糕,因爲如果您有幾個阻塞操作,最終可能會阻塞線程池中的所有線程並且沒有空閒線程。如果你的所有線程都被阻塞,直到另一個演員/未來將被安排,那麼最終可能會導致死鎖。

下面是一個例子:

 
import scala.concurrent.blocking 
... 

Future { 
    val image = blocking { load_image_from_potentially_slow_media() } 
    val enhanced = image.enhance() 
    blocking { 
    if (oracle.queryBetter(image, enhanced)) { 
     write_new_image(enhanced) 
    } 
    } 
    enhanced 
} 

文檔是here