1
我想使用reactivemongo-akkastream 0.12.1從Mongodb流式傳輸數據,並將結果返回到其中一條路徑(使用Akka-http)的CSV流中。 我沒有實現,這裏繼爲例:使用AKKA-HTTP流式CSV源
,它看起來做工精細。
我現在面臨的唯一問題是如何將標題添加到輸出CSV文件。有任何想法嗎?
感謝
我想使用reactivemongo-akkastream 0.12.1從Mongodb流式傳輸數據,並將結果返回到其中一條路徑(使用Akka-http)的CSV流中。 我沒有實現,這裏繼爲例:使用AKKA-HTTP流式CSV源
,它看起來做工精細。
我現在面臨的唯一問題是如何將標題添加到輸出CSV文件。有任何想法嗎?
感謝
除了一個事實,即這個例子是不是真的生成CSV的穩健方法(不提供適當的轉義),則需要返工有點添加標題。這是我會做:
Flow
到Source[Tweet]
轉換爲CSV行,例如源一個Source[List[String]]
List[String]
這裏的來源是一些示例代碼:
case class Tweet(uid: String, txt: String)
def getTweets: Source[Tweet, NotUsed] = ???
val tweetToRow: Flow[Tweet, List[String], NotUsed] =
Flow[Tweet].map { t =>
List(
t.uid,
t.txt.replaceAll(",", "."))
}
// provide a marshaller from a row (List[String]) to a ByteString
implicit val tweetAsCsv = Marshaller.strict[List[String], ByteString] { row =>
Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`,() =>
ByteString(row.mkString(","))
)
}
// enable csv streaming
implicit val csvStreaming = EntityStreamingSupport.csv()
val route = path("tweets") {
val headers = Source.single(List("uid", "text"))
val tweets: Source[List[String], NotUsed] = getTweets.via(tweetToRow)
complete(headers.concat(tweets))
}
更新:如果您的getTweets
方法返回Future
,則可以僅通過其源值映射並預先設置標題,例如:
val route = path("tweets") {
val headers = Source.single(List("uid", "text"))
val rows: Future[Source[List[String], NotUsed]] = getTweets
.map(tweets => headers.concat(tweets.via(tweetToRow)))
complete(rows)
}
Thanks Mikesname !!最後一個問題,我實際上是一個akka actor,它以這種方式從mongodb返回數據:(MyView?GetStuff())。mapTo [Source [MyObject,NotUsed]] - 如何將它重定向到tweetToRow方法它返回一個未來。謝謝 –
@HasseneBenAmara更新了答案 – Mikesname
再次感謝您的幫助。實際上,via()函數看起來不正確,這是我在構建項目時得到的...錯誤:(155,87)類型不匹配; found:akka.stream.scaladsl.Flow [MyObject,List [String],akka.NotUsed] 必需:akka.stream.Graph [akka.stream.FlowShape [Product with java.io.Serializable ,?] ,?] 我仔細檢查了通過簽名,但不知道如何重構我的代碼,以使其工作。我還是新來的阿卡流和斯卡拉,所以真的很感謝你的幫助! –