2016-11-22 23 views
2

我剛開始使用異步數據庫驅動程序reactivecouchbase,但遇到了一些基本的設計問題。 在傳統的方法中,我會通過限制連接數來限制數據庫施加的壓力。然而,使用異步驅動程序,我可以用新的查詢來淹沒數據庫?如何在akka流地圖中使用異步驅動程序vs mapAsync

如果這已成爲重要的是在一個例子如下。

可以說我有兩種不同的方式調用數據庫。

我的函數調用DB:

asyncCallDB: Future[DBResponse] 
blockingCallDB: DBResponse 

現在我想分貝呼籲過流,我可以使用兩種不同的功能映射:

Flow.map() 
Flow.mapAsync(numberOfConcurrentCalls)() 

現在我的問題是你會如何選擇調用數據庫:

Flow.map(blockingCallDB) //One call at a time with back preassure 
Flow.map(asyncCallDB) //Unlimited calls floods db no back pressure? 

Flow.mapAsync(numberOfConcurrentCalls)(blockingCallDB) //Up to numberOfConcurrentCalls at the same time with back pressure 
Flow.mapAsync(numberOfConcurrentCalls)(asyncCallDB) //Unlimited calls floods db no back pressure? 

我覺得我understading缺少這裏,想下適應這種類型的決定。

+1

[地圖和mapAsync之間的差異]的可能重複(http://stackoverflow.com/questions/35146418/difference-between-map-and-mapasync) –

+0

你確定你可以調用'Flow.mapAsync'函數返回'DBResponse'而不是'Future [DBResponse]'?根據[Akka API doc](http://doc.akka.io/api/akka/2.4/index.html#[email protected]%5BT%5D(parallelism:Int)(f:Out => scala.concurrent.Future%5BT%5D):FlowOps.this.Repr%5BT%5D)這應該是不可能的。 –

回答

3

ReactiveCouchbase使用AsyncHttpClient與Couchbase服務器進行通信。正如你可以see in the source code它調用setMaximumConnectionsTotal,這限制了併發連接的數量。實際值取決於您在couchbase.http.maxTotalConnections中配置的內容。

您創建的每個CouchbaseBucket都有一個AsyncHttpClient。所以每個CouchbaseBucket最多有maxTotalConnections連接。

Couchbase documentation on N1QL REST API

的REST API運行同步,所以一旦在 語句的執行請求被啓動,結果流返回給客戶端, 終止語句完成的執行。

因此,在實踐中,每個存儲桶的併發查詢數限制爲maxTotalConnections

因此,DB上的反壓總是以某種方式受到限制。因爲您將maxTotalConnections設置爲非負數,或者因爲您的客戶端由於RAM或文件描述符的數量有限而無法創建更多連接。

但是,仍然有可能創建太多Future s,以至於您的客戶端將耗盡內存。無論何時您認爲可能出現這種情況,您應該使用mapAsync,如in this answer"Buffers and working with rate" (Akka documentation)中提到的其他技術之一。

有一個good description of mapAsync in the Akka documentation

通行證進入的元素,返回一個Future結果的功能。 當 未來到達時,結果傳遞到下游。最多n個元素可以同時處理 ...

記住Flow.mapAsync將不會運行本身任何東西,它只是返回Flow,你必須一個SourceSink之間的連接,然後runAkka Quick Start Guide以非常理解的方式描述了這一點。

+0

是的,但如果您正在使用阻止請求,地圖異步纔有意義? – user3139545

+0

@ user3139545那麼,你需要運行流,對吧?所以你需要連接某種接收器到你的流量。水槽將不會被處理,直到您的流程產生一些結果。 'Flow.mapAsync'本身不會做任何事情,它只是返回另一個'Flow'([doc](http://doc.akka.io/api/akka/2.4/index.html#akka.stream.scaladsl流式細胞@ mapAsync [T](並行的:int)(F:時間=> scala.concurrent.Future [T]):FlowOps.this.Repr [T]))。 –

+0

我發現了一個Couchbase文檔片段,其中提到查詢確實是同步的:[「REST API同步運行,因此一旦請求中的語句執行開始,結果就會回送到客戶端,當語句執行完成時終止。「](http://developer.couchbase.com/documentation/server/4.5/n1ql/n1ql-rest-api/index.html) –

相關問題