我經常回到純粹的功能,非阿卡,對於像這樣的問題的技術,然後「升降機」這些功能集成到阿卡結構。我的意思是我儘量只使用Scala的「東西」,然後嘗試換阿卡以後裏面的東西......
文件創建
基於「隨機生成FileOutputStream
創建啓動名稱「:
def randomFileNameGenerator : String = ??? //not specified in question
import java.io.FileOutputStream
val randomFileOutGenerator :() => FileOutputStream =
() => new FileOutputStream(randomFileNameGenerator)
國家
需要有存儲的某種方式‘當前文件的狀態’,例如,字節數已經寫好:
case class FileState(byteCount : Int = 0,
fileOut : FileOutputStream = randomFileOutGenerator())
文件寫入
首先我們確定我們是否會違反與指定ByteString
的最大文件大小閾值:
import akka.util.ByteString
val isEndOfChunk : (FileState, ByteString, Int) => Boolean =
(state, byteString, maxBytes) =>
state.byteCount + byteString.length > maxBytes
那麼我們有編寫新的函數,如果我們已經清除了當前函數的容量,或者返回當前狀態(如果它仍然低於容量),則創建新的FileState
:
val closeFileInState : FileState => Unit =
(_ : FileState).fileOut.close()
val getCurrentFileState(FileState, ByteString, Int) => FileState =
(state, byteString, maxBytes) =>
if(isEndOfChunk(maxBytes, state, byteString)) {
closeFileInState(state)
FileState()
}
else
state
剩下的唯一一件事就是寫FileOutputStream
:
val writeToFileAndReturn(FileState, ByteString) => FileState =
(fileState, byteString) => {
fileState.fileOut write byteString.toArray
fileState copy (byteCount = fileState.byteCount + byteString.size)
}
//the signature ordering will become useful
def writeToChunkedFile(maxBytes : Int)(fileState : FileState, byteString : ByteString) : FileState =
writeToFileAndReturn(getCurrentFileState(maxBytes, fileState, byteString), byteString)
折任何GenTraversableOnce
Scala中的一個GenTraversableOnce
的任何集合,平行與否,具有摺疊運算符。這些包括迭代器,矢量,數組,Seq,Scala流......個最終writeToChunkedFile
功能完全匹配的GenTraversableOnce#fold簽名:
val anyIterable : Iterable = ???
val finalFileState = anyIterable.fold(FileState())(writetochunkedFile(maxBytes))
最後一個鬆散端;最後的FileOutputStream
也需要關閉。由於折只會發出,去年FileState
我們可以關閉一個:
closeFileInState(finalFileState)
阿卡流
阿卡流量獲取其fold
從FlowOps#fold這恰好符合GenTraversableOnce
簽名。因此,我們可以「提升」我們的常規功能爲類似的方式流值,我們使用Iterable
倍:
import akka.stream.scaladsl.Flow
def chunkerFlow(maxBytes : Int) : Flow[ByteString, FileState, _] =
Flow[ByteString].fold(FileState())(writeToChunkedFile(maxBytes))
約定期功能處理問題的好處是,他們可以超越流以外的異步框架內使用,例如期貨或演員。你也不需要在單元測試中使用akka ActorSystem
,只需要定期的語言數據結構。
import akka.stream.scaladsl.Sink
import scala.concurrent.Future
def byteStringSink(maxBytes : Int) : Sink[ByteString, _] =
chunkerFlow(maxBytes) to (Sink foreach closeFileInState)
然後,您可以使用此Sink
排HttpEntity
從HttpRequest
到來。
哇謝謝,這是一個非常詳細的迴應!沒有那樣做。在'GraphStageLogic'中,這種方法與'fold'之間的任何區別,並且具有相同的邏輯(保持狀態並手動創建輸出流)?例如(根據您的評論+我以前的鏈接)http://pastebin.com/tzLFAmzk?最小的優點是,它允許創建文件時儘快創建文件(但代碼更長,更容易出錯)。 – Vuzi
@Vuzi歡迎您。我在「真實世界」中看到的方法和「GraphStageLogic」之間的主要區別在於測試。通過我的函數,所有的測試都可以在沒有'ActorSystem','ActorMaterializer'和'ExecutionContext'的情況下完成。如果你把你的邏輯放在akka流構造中,那麼你需要所有的akka框架來測試這個邏輯。快樂的黑客攻擊。 –