2
我正在使用Alpakka-FTP,但也許我正在尋找一個通用的akka-stream模式。該FTP連接器可以列出文件或檢索它們:理想Akka Streams,源項目作爲另一個來源?
def ls(host: String): Source[FtpFile, NotUsed]
def fromPath(host: String, path: Path): Source[ByteString, Future[IOResult]]
,我想創建這樣一個流:
LIST
.FETCH_ITEM
.FOREACH(do something)
但我無法創建具有兩個功能,例如流我上面寫過。我覺得我應該能到那裏使用Flow
,像
Ftp.ls
.via(some flow that uses the Ftp.fromPath above)
.runWith(Sink.foreach(do something))
這是可能的,上面只有ls
和fromPath
功能給?
編輯:
我能解決它使用一個演員和mapAsync
,但我還是覺得應該更簡單。
class Downloader extends Actor {
override def receive = {
case ftpFile: FtpFile =>
Ftp.fromPath(Paths.get(ftpFile.path), settings)
.toMat(FileIO.toPath(Paths.get("testHDF.txt")))(Keep.right)
.run() pipeTo sender
}
}
val downloader = as.actorOf(Props(new Downloader))
Ftp.ls("test_path", settings)
.mapAsync(1)(ftpFile => (downloader ? ftpFile) (3.seconds).mapTo[IOResult])
.runWith(Sink.foreach(res => println("got it!" + res)))