2016-12-30 49 views
3

我試圖將一個傳入的Akka字節流(來自http請求的正文,但也可能來自一個文件)分成多個定義的文件尺寸。Akka流 - 將一串ByteString分割爲多個文件

例如,如果我正在上傳10Gb文件,它會創建類似10G的1個文件。這些文件會有隨機生成的名稱。我的問題是,我不知道從哪裏開始,因爲我讀過的所有響應和示例都將整個塊存儲到內存中,或者使用基於字符串的分隔符。除非我真的不能擁有1Gb的「塊」,然後就把它們寫到磁盤上。

有沒有容易方法來執行那種操作?我唯一的想法是使用類似http://doc.akka.io/docs/akka/2.4/scala/stream/stream-cookbook.html#Chunking_up_a_stream_of_ByteStrings_into_limited_size_ByteStrings的東西,但轉換爲FlowShape[ByteString, File]之類的東西,將自己寫入文件大塊直到達到最大文件大小,然後創建一個新文件等,然後流回創建的文件。它看起來像沒有使用正確阿卡一個窮兇極惡的想法..提前

感謝

回答

6

我經常回到純粹的功能,非阿卡,對於像這樣的問題的技術,然後「升降機」這些功能集成到阿卡結構。我的意思是我儘量只使用Scala的「東西」,然後嘗試換阿卡以後裏面的東西......

文件創建

基於「隨機生成FileOutputStream創建啓動名稱「:

def randomFileNameGenerator : String = ??? //not specified in question 

import java.io.FileOutputStream 

val randomFileOutGenerator :() => FileOutputStream = 
() => new FileOutputStream(randomFileNameGenerator) 

國家

需要有存儲的某種方式‘當前文件的狀態’,例如,字節數已經寫好:

case class FileState(byteCount : Int = 0, 
        fileOut : FileOutputStream = randomFileOutGenerator()) 

文件寫入

首先我們確定我們是否會違反與指定ByteString的最大文件大小閾值:

import akka.util.ByteString 

val isEndOfChunk : (FileState, ByteString, Int) => Boolean = 
    (state, byteString, maxBytes) => 
    state.byteCount + byteString.length > maxBytes 

那麼我們有編寫新的函數,如果我們已經清除了當前函數的容量,或者返回當前狀態(如果它仍然低於容量),則創建新的FileState

val closeFileInState : FileState => Unit = 
    (_ : FileState).fileOut.close() 

val getCurrentFileState(FileState, ByteString, Int) => FileState = 
    (state, byteString, maxBytes) => 
    if(isEndOfChunk(maxBytes, state, byteString)) { 
     closeFileInState(state) 
     FileState() 
    } 
    else 
     state 

剩下的唯一一件事就是寫FileOutputStream

val writeToFileAndReturn(FileState, ByteString) => FileState = 
    (fileState, byteString) => { 
    fileState.fileOut write byteString.toArray 
    fileState copy (byteCount = fileState.byteCount + byteString.size) 
    } 

//the signature ordering will become useful 
def writeToChunkedFile(maxBytes : Int)(fileState : FileState, byteString : ByteString) : FileState =  
    writeToFileAndReturn(getCurrentFileState(maxBytes, fileState, byteString), byteString)  

折任何GenTraversableOnce

Scala中的一個GenTraversableOnce的任何集合,平行與否,具有摺疊運算符。這些包括迭代器,矢量,數組,Seq,Scala流......個最終writeToChunkedFile功能完全匹配的GenTraversableOnce#fold簽名:

val anyIterable : Iterable = ??? 

val finalFileState = anyIterable.fold(FileState())(writetochunkedFile(maxBytes)) 

最後一個鬆散端;最後的FileOutputStream也需要關閉。由於折只會發出,去年FileState我們可以關閉一個:

closeFileInState(finalFileState) 

阿卡流

阿卡流量獲取其foldFlowOps#fold這恰好符合GenTraversableOnce簽名。因此,我們可以「提升」我們的常規功能爲類似的方式流值,我們使用Iterable倍:

import akka.stream.scaladsl.Flow 

def chunkerFlow(maxBytes : Int) : Flow[ByteString, FileState, _] = 
    Flow[ByteString].fold(FileState())(writeToChunkedFile(maxBytes)) 

約定期功能處理問題的好處是,他們可以超越流以外的異步框架內使用,例如期貨或演員。你也不需要在單元測試中使用akka ActorSystem,只需要定期的語言數據結構。

import akka.stream.scaladsl.Sink 
import scala.concurrent.Future 

def byteStringSink(maxBytes : Int) : Sink[ByteString, _] = 
    chunkerFlow(maxBytes) to (Sink foreach closeFileInState) 

然後,您可以使用此SinkHttpEntityHttpRequest到來。

+0

哇謝謝,這是一個非常詳細的迴應!沒有那樣做。在'GraphStageLogic'中,這種方法與'fold'之間的任何區別,並且具有相同的邏輯(保持狀態並手動創建輸出流)?例如(根據您的評論+我以前的鏈接)http://pastebin.com/tzLFAmzk?最小的優點是,它允許創建文件時儘快創建文件(但代碼更長,更容易出錯)。 – Vuzi

+0

@Vuzi歡迎您。我在「真實世界」中看到的方法和「GraphStageLogic」之間的主要區別在於測試。通過我的函數,所有的測試都可以在沒有'ActorSystem','ActorMaterializer'和'ExecutionContext'的情況下完成。如果你把你的邏輯放在akka流構造中,那麼你需要所有的akka​​框架來測試這個邏輯。快樂的黑客攻擊。 –

1

您可以編寫自定義圖形階段。 您的問題類似於在上傳到亞馬遜S3期間在alpakka中遇到的問題。 (谷歌alpakka s3連接器..他們不會讓我發佈超過2個鏈接)

但由於某些原因,s3連接器DiskBuffer寫入整個傳入來源的字節串到文件,然後發出該塊進行進一步的流處理..

我們想要的東西類似於limit a source of byte strings to specific length。在這個例子中,他們通過維護一個內存緩衝區,將傳入的Source [ByteString,_]限制爲一個固定大小的byteStrings的源。我採用它來處理文件。 這樣做的好處是您可以使用專用線程池來執行阻塞IO。要獲得良好的反應流,您希望阻止IO在actor系統中的單獨線程池中。 PS:這不會嘗試生成確切大小的文件..所以如果我們在100MB文件中讀取2KB的額外文件,我們會將這些額外的字節寫入當前文件,而不是嘗試實現確切的大小。

import java.io.{FileOutputStream, RandomAccessFile} 
import java.nio.channels.FileChannel 
import java.nio.file.Path 

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} 
import akka.stream._ 
import akka.util.ByteString 

case class MultipartUploadChunk(path: Path, size: Int, partNumber: Int) 
//Starts writing the byteStrings received from upstream to a file. Emits a path after writing a partSize number of bytes. Does not attemtp to write exact number of bytes. 
class FileChunker(maxSize: Int, tempDir: Path, partSize: Int) 
    extends GraphStage[FlowShape[ByteString, MultipartUploadChunk]] { 

    assert(maxSize > partSize, "Max size should be larger than part size. ") 

    val in: Inlet[ByteString] = Inlet[ByteString]("PartsMaker.in") 
    val out: Outlet[MultipartUploadChunk] = Outlet[MultipartUploadChunk]("PartsMaker.out") 

    override val shape: FlowShape[ByteString, MultipartUploadChunk] = FlowShape.of(in, out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) with OutHandler with InHandler { 

     var partNumber: Int = 0 
     var length: Int = 0 
     var currentBuffer: Option[PartBuffer] = None 

     override def onPull(): Unit = 
     if (isClosed(in)) { 
      emitPart(currentBuffer, length) 
     } else { 
      pull(in) 
     } 

     override def onPush(): Unit = { 
     val elem = grab(in) 
     length += elem.size 
     val currentPart: PartBuffer = currentBuffer match { 
      case Some(part) => part 
      case None => 
      val newPart = createPart(partNumber) 
      currentBuffer = Some(newPart) 
      newPart 
     } 
     currentPart.fileChannel.write(elem.asByteBuffer) 
     if (length > partSize) { 
      emitPart(currentBuffer, length) 
      //3. .increment part number, reset length. 
      partNumber += 1 
      length = 0 
     } else { 
      pull(in) 
     } 
     } 

     override def onUpstreamFinish(): Unit = 
     if (length > 0) emitPart(currentBuffer, length) // emit part only if something is still left in current buffer. 

     private def emitPart(maybePart: Option[PartBuffer], size: Int): Unit = maybePart match { 
     case Some(part) => 
      //1. flush the part buffer and truncate the file. 
      part.fileChannel.force(false) 
      //   not sure why we do this truncate.. but was being done in alpakka. also maybe safe to do. 
//     val ch = new FileOutputStream(part.path.toFile).getChannel 
//   try { 
//   println(s"truncating to size $size") 
//   ch.truncate(size) 
//   } finally { 
//   ch.close() 
//   } 
      //2emit the part 
      val chunk = MultipartUploadChunk(path = part.path, size = length, partNumber = partNumber) 
      push(out, chunk) 
      part.fileChannel.close() // TODO: probably close elsewhere. 
      currentBuffer = None 
      //complete stage if in is closed. 
      if (isClosed(in)) completeStage() 
     case None => if (isClosed(in)) completeStage() 
     } 

     private def createPart(partNum: Int): PartBuffer = { 
     val path: Path = partFile(partNum) 
     //currentPart.deleteOnExit() //TODO: Enable in prod. requests that the file be deleted when VM dies. 
     PartBuffer(path, new RandomAccessFile(path.toFile, "rw").getChannel) 
     } 

     /** 
     * Creates a file in the temp directory with name bmcs-buffer-part-$partNumber 
     * @param partNumber the part number in multipart upload. 
     * @return 
     * TODO:add unique id to the file name. for multiple 
     */ 
     private def partFile(partNumber: Int): Path = 
     tempDir.resolve(s"bmcs-buffer-part-$partNumber.bin") 
     setHandlers(in, out, this) 
    } 

    case class PartBuffer(path: Path, fileChannel: FileChannel) //TODO: see if you need mapped byte buffer. might be ok with just output stream/channel. 

}