2016-03-14 42 views
1

我遇到以下問題。 我正在查詢某個服務器的某些數據並將其重新獲取爲HttpEntity.Chunk。 字符串看起來像高達10.000.000行這樣的迴應:將HttpEntity.Chunk轉換爲數組[String]

[{"name":"param1","value":122343,"time":45435345}, 
{"name":"param2","value":243,"time":4325435}, 
......] 

現在我想要得到的輸入數據到和陣列[字符串]其中每個String是從響應一條線,因爲後來它應該被導入到apache spark數據框中。 目前我做它喜歡這樣:

//For the http request 
trait StartHttpRequest { 
    implicit val system: ActorSystem 
    implicit val materializer: ActorMaterializer 

    def httpRequest(data: String, path: String, targetPort: Int, host: String): Future[HttpResponse] = { 
    val connectionFlow: Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = { 
     Http().outgoingConnection(host, port = targetPort) 
    } 
    val responseFuture: Future[HttpResponse] = 
     Source.single(RequestBuilding.Post(uri = path, entity = HttpEntity(ContentTypes.`application/json`, data))) 
     .via(connectionFlow) 
     .runWith(Sink.head) 
    responseFuture 
    } 
} 

//result of the request 
val responseFuture: Future[HttpResponse] = httpRequest(.....) 

//convert to string 
responseFuture.flatMap { response => 
     response.status match { 
      case StatusCodes.OK => 
      Unmarshal(response.entity).to[String] 
    } 
} 

//and then something like this, but with even more stupid stuff 
responseFuture.onSuccess { str:String => 
    masterActor! str.split("""\},\{""") 
} 

我的問題是,這將是一個更好的方式來獲得結果到一個數組? 如何直接解組響應實體?因爲.to [Array [String]]例如不起作用。而且因爲有太多的線路要來了,我可以用一條流來做,更有效率嗎?

回答

1

回答你的問題出順序:

我怎麼能直接解組響應實體?

有一個existing question & answer有關unmarshalling數組的案例類。

什麼會是一個更好的方式來獲得結果到一個數組?

我會利用分塊性質和使用流。這使您可以同時執行字符串處理和json解析。

首先你需要一個容器類和解析器:

case class Data(name : String, value : Int, time : Long) 

object MyJsonProtocol extends DefaultJsonProtocol { 
    implicit val dataFormat = jsonFormat3(Data) 
} 

然後,你必須做一些操作,以獲得JSON對象看的權利:

//Drops the '[' and the ']' characters 
val dropArrayMarkers = 
    Flow[ByteString].map(_.filterNot(b => b == '['.toByte || b == ']'.toByte)) 

val preppendBrace = 
    Flow[String].map(s => if(!s.startsWith("{")) "{" + s else s) 

val appendBrace = 
    Flow[String].map(s => if(!s.endsWith("}")) s + "}" else s) 

val parseJson = 
    Flow[String].map(_.parseJson.convertTo[Data]) 

最後,結合這些流量轉換一個字節源的數據對象的來源:

def strSourceToDataSource(source : Source[ByteString,_]) : Source[Data, _] = 
    source.via(dropArrayMarkers) 
     .via(Framing.delimiter(ByteString("},{"), 256, true)) 
     .map(_.utf8String) 
     .via(prependBrace) 
     .via(appendBrace) 
     .via(parseJson) 

這個sou然後可以將數據對象排除爲Seq

val dataSeq : Future[Seq[Data]] = 
    responseFuture flatMap { response => 
    response.status match { 
     case StatusCodes.OK => 
     strSourceToDataSource(response.entity.dataBytes).runWith(Sink.seq) 
    } 
    } 
+0

哇謝謝您的詳細解答。我的當前溶液做這樣的: '情況下類元素(名稱:字符串,值:中等,時間:智力)' 和 'responseFuture.flatMap {響應=> response.status匹配{ 情況下StatusCodes.OK => Unmarshal(response.entity).to [Seq [Element]] } }' }但我認爲流式傳輸解決方案更有意義,所以我會試試。你碰巧也使用apache spark嗎?因爲我可能有後續問題。謝謝! – rincewind

+0

http://stackoverflow.com/questions/36020699/getting-an-apache-spark-dataframe-in-the-right-format – rincewind

+0

@rincewind歡迎您。我會看看另一個問題。快樂的黑客攻擊。 –

相關問題