2016-12-03 94 views
1

我有一個非常大的結果集(60k +記錄列),我從數據庫中提取並使用Anorm進行分析(儘管我可以使用play的默認數據訪問根據需要返回ResultSet的模塊)。我需要將這些結果直接轉換並傳輸到客戶端(不需要將它們保存在內存中的大列表中),然後直接將它們下載到客戶端計算機上的文件中。如何使用Play 2.5在分塊響應中將Anorm大型查詢結果流式傳輸到客戶端

我一直在參考ScalaStream 2.5.x Play文檔中分塊響應部分中演示的內容。我在實現它顯示的「getDataStream」部分時遇到了困難。

我也一直在參考ScalaAnorm 2.5.x Play文檔中的Streaming Results和Iteratee部分演示的內容。我曾嘗試通過管道將結果像這裏返回什麼枚舉:

val resultsEnumerator = Iteratees.from(SQL"SELECT * FROM Test", SqlParser.str("colName")) 

val dataContent = Source.fromPublisher(Streams.enumeratorToPublisher(resultsEnumerator)) 
Ok.chunked(dataContent).withHeaders(("ContentType","application/x-download"),("Content-disposition","attachment; filename=myDataFile.csv")) 

但生成的文件/內容是空的。

我無法找到如何將函數返回像這樣的數據服務轉換任何示例代碼或引用:

@annotation.tailrec 
def go(c: Option[Cursor], l: List[String]): List[String] = c match { 
    case Some(cursor) => { 
    if (l.size == 10000000) l // custom limit, partial processing 
    else { 
     go(cursor.next, l :+ cursor.row[String]("VBU_NUM")) 
    } 
    } 
    case _ => l 
} 

val sqlString = s"select colName FROM ${tableName} WHERE ${whereClauseStr}" 

val results : Either[List[Throwable], List[String]] = SQL(sqlString).withResult(go(_, List.empty[String])) 
results 

到的東西我可以傳遞給Ok.chunked()。

所以基本上我的問題是,我應該如何將數據庫中的每條記錄提取到一個流中,我可以將其轉換併發送到客戶端作爲可以下載到文件的分塊響應?

我不希望爲此使用Slick。但是我可以使用不使用Anorm的解決方案,只使用返回原始java.sql.ResultSet對象並使用它的播放dbApi對象。

+0

沒有必要使用轉換'Iteratee'與ANORM /播放2.5.X:https://github.com/playframework/anorm/blob/master/ docc/manual/working/scalaAnorm.md#akka-stream – cchantep

+0

@cchantep是正確的,您想要使用Akka Streams支持而不是Iteratees(在'anorm-akka' 2.5.2)。不幸的是,一些有用的部分(例如,獲取完成值以允許您關閉數據庫連接)仍在Anorm 2.6.0-SNAPSHOT中,尚未發佈。 – Mikesname

+0

對於資源管理來說,Iteratee支持並不是更好,所以這不會改變這一點,直接使用Akka支持會更簡單。 – cchantep

回答

4

在參考了Anorm Akka Support文檔以及大量的試驗和錯誤之後,我能夠實現我想要的解決方案。我不得不添加這些依賴關係

"com.typesafe.play" % "anorm_2.11" % "2.5.2", 
"com.typesafe.play" % "anorm-akka_2.11" % "2.5.2", 
"com.typesafe.akka" %% "akka-stream" % "2.4.4" 

to by build.sbt for Play 2.5。

和我實現了這樣的事情

//...play imports 
import anorm.SqlParser._ 
import anorm._ 

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Sink, Source} 

... 

private implicit val akkaActorSystem = ActorSystem("MyAkkaActorSytem") 
private implicit val materializer = ActorMaterializer() 

def streamedAnormResultResponse() = Action { 
    implicit val connection = db.getConnection() 

    val parser: RowParser[...] = ... 
    val sqlQuery: SqlQuery = SQL("SELECT * FROM table") 

    val source: Source[Map[String, Any] = AkkaStream.source(sqlQuery, parser, ColumnAliaser.empty).alsoTo(Sink.onComplete({ 
    case Success(v) => 
     connection.close() 
    case Failure(e) => 
     println("Info from the exception: " + e.getMessage) 
     connection.close() 
    })) 

    Ok.chunked(source) 
} 
+0

像這樣@elephantopus使用'alsoTo'安全嗎?我們最想知道的是流的完成權?我想知道這種方法是否有任何副作用,如性能問題等。 – endertunc

+0

是的,它的唯一目的是在完成流時關閉連接,因爲它是異步的,對性能沒有影響(我注意到了)。雖然我沒有徹底測試過 – Elephantopus

相關問題