假設我有一個「生產者 - 消費者」問題:生產者向消費者發送消息,消費者使用Scala Futures
異步處理它們:例如future { /* do the processing */ }
。Scala生產者 - 消費者
現在假設生產者每秒產生100條消息。但消費者每秒只能處理10條消息。會發生什麼 ?我猜會有內存泄漏。將會有很多Future
對象,並且線程池的內部消息隊列也會增長。是否有意義 ?
處理它的最好方法是什麼?
假設我有一個「生產者 - 消費者」問題:生產者向消費者發送消息,消費者使用Scala Futures
異步處理它們:例如future { /* do the processing */ }
。Scala生產者 - 消費者
現在假設生產者每秒產生100條消息。但消費者每秒只能處理10條消息。會發生什麼 ?我猜會有內存泄漏。將會有很多Future
對象,並且線程池的內部消息隊列也會增長。是否有意義 ?
處理它的最好方法是什麼?
您可以設置最大隊列大小。實際上,我認爲認爲默認情況下Akka演員隊列有限,儘管我在這裏可能是錯的。
這並不能真正解決問題,但最終,如果您沒有足夠的後端參與者來執行處理,您將無法處理所有內容。
我喜歡Netflix所做的:所有請求都通過代理來監視後端的健康狀況。如果後端花費的時間過長,則會丟棄請求並提供回退:或者是合理的默認值或錯誤消息。他們談論了很多關於他們的架構,例如this presentation。
謝謝。假設現在我在生產者和消費者之間有一個有限的隊列。它解決了問題嗎?你不會改變消費者邏輯嗎? – Michael
有多個消費者 - 使用演員池。根據泳池的壓力,您可以動態調整尺寸。見http://doc.akka.io/docs/akka/snapshot/scala/routing.html
謝謝。假設我不能添加更多的演員/線程。例如,處理是CPU限制的,我只有一個CPU。 – Michael
然後你會耗盡你的資源。如果你生產更多,那麼你可以消耗,你將不得不放棄消息在某個時刻(使用與丹尼爾建議的大小限制的有界隊列)。或者使用更多的資源,也許是一種集羣方法(參見http://doc.akka.io/docs/akka/snapshot/common/cluster.html) –
假設我只有一臺主機,只有一個CPU。假設生產者和消費者平均工作得很好。問題是處理_bursts_。如果爆發比計劃更大,則允許消費者丟棄一些消息。我知道如何用線程實現它並阻止有界的消息隊列。現在我想知道如何使用'scala.concurrent.future'來實現。 – Michael
在阿卡,執行上下文中使用,但似乎沒有郵箱 - 這將是值得閱讀的源代碼,但我可以通過實驗來回答你的問題:
未來的不具有「郵箱「和我不是100%肯定正是阿卡執行上下文實際上包含發動機罩或下什麼做的,但我們可以看到,阿卡會出現內存不足的直接使用期貨時:
scala> import scala.concurrent.Future
import scala.concurrent.Future
scala> import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext.Implicits.global ^
scala> while(1==1) Future(Thread.sleep(100))
java.lang.OutOfMemoryError: Java heap space
如果我們」重新討論消息,然後有一個郵箱描述了參與者消息隊列的行爲ld填充,因爲一次只處理一條消息) - 我將在下面解釋這一點。
假設一個有界的郵箱(例如一個大小限制的郵箱)會發生什麼消息。 答案取決於郵箱。 首先,有界郵箱有一些設置,如大小限制:
bounded-mailbox {
mailbox-type = "akka.dispatch.BoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 10s
}
現在,當這個限制被擊中,阿卡要麼刪除舊的或者根據郵箱的配置方式新的信息 - 例如用此設置
# whether to drop older items (instead of newer) when the queue is full
discard-old-when-full = on
顯然,如果有喜歡的內存不足然後您的應用程序可能會崩潰意味着該消息將丟失,因爲它們存儲在內存中的其他資源問題。無界郵箱將繼續堆疊郵件,直到出現錯誤情況,這就是爲什麼您可能想使用有界郵箱。
如果錯誤情況下的消息丟失是不可取的,還有另一種選擇=可以使用持久郵箱,它將消息存儲在更持久的地方,例如文件中。以下是一個示例郵箱配置,它使用文件提供更持久的郵件存儲。
akka {
actor {
mailbox {
file-based {
# directory below which this queue resides
directory-path = "./_mb"
# attempting to add an item after the queue reaches this size (in items)
# will fail.
max-items = 2147483647
# attempting to add an item after the queue reaches this size (in bytes)
# will fail.
max-size = 2147483647 bytes
# attempting to add an item larger than this size (in bytes) will fail.
max-item-size = 2147483647 bytes
# maximum expiration time for this queue (seconds).
max-age = 0s
# maximum journal size before the journal should be rotated.
max-journal-size = 16 MiB
# maximum size of a queue before it drops into read-behind mode.
max-memory-size = 128 MiB
# maximum overflow (multiplier) of a journal file before we re-create it.
max-journal-overflow = 10
# absolute maximum size of a journal file until we rebuild it,
# no matter what.
max-journal-size-absolute = 9223372036854775807 bytes
# whether to drop older items (instead of newer) when the queue is full
discard-old-when-full = on
# whether to keep a journal file at all
keep-journal = on
# whether to sync the journal after each transaction
sync-journal = off
# circuit breaker configuration
circuit-breaker {
# maximum number of failures before opening breaker
max-failures = 3
# duration of time beyond which a call is assumed to be timed out and
# considered a failure
call-timeout = 3 seconds
# duration of time to wait until attempting to reset the breaker during
# which all calls fail-fast
reset-timeout = 30 seconds
}
}
}
}
}
注 - 我很困惑 - 這個問題是關於期貨而不是演員。 – JasonG