0

如何從Twitter中讀取響應數據Streaming API - POST狀態/過濾器? 我建立了連接,並收到200個狀態代碼,但我不知道如何閱讀推文。我只是想在他們來的時候發佈推文。Playframework和Twitter Streaming API

ws.url(url) 
.sign(OAuthCalculator(consumerKey, requestToken)) 
.withMethod("POST") 
.stream() 
.map { response => 
    if(response.headers.status == 200) 
    println(response.body) 
} 

編輯:我發現這個解決方案

ws.url(url) 
.sign(OAuthCalculator(consumerKey, requestToken)) 
.withMethod("POST") 
.stream() 
.map { response => 
    if(response.headers.status == 200){ 
    response.body 
     .scan("")((acc, curr) => if (acc.contains("\r\n")) curr.utf8String else acc + curr.utf8String) 
     .filter(_.contains("\r\n")) 
     .map(json => Try(parse(json).extract[Tweet])) 
     .runForeach { 
     case Success(tweet) => 
      println("-----") 
      println(tweet.text) 
     case Failure(e) => 
      println("-----") 
      println(e.getStackTrace) 
     } 
    } 
} 

回答

4

流式WS請求響應的主體是一個Akka Streams Source字節。由於Twitter Api響應以新行分隔(通常),因此您可以使用Framing.delimiter將它們拆分爲字節塊,將塊解析爲JSON,然後按照您的要求進行操作。像這樣的東西應該工作:

import akka.stream.scaladsl.Framing 
import scala.util.{Success, Try} 
import akka.util.ByteString 
import play.api.libs.json.{JsSuccess, Json, Reads} 
import play.api.libs.oauth.{ConsumerKey, OAuthCalculator, RequestToken} 

case class Tweet(id: Long, text: String) 
object Tweet { 
    implicit val reads: Reads[Tweet] = Json.reads[Tweet] 
} 

def twitter = Action.async { implicit request => 
    ws.url("https://stream.twitter.com/1.1/statuses/filter.json?track=Rio2016") 
     .sign(OAuthCalculator(consumerKey, requestToken)) 
     .withMethod("POST") 
     .stream().flatMap { response => 
    response.body 
     // Split up the byte stream into delimited chunks. Note 
     // that the chunks are quite big 
     .via(Framing.delimiter(ByteString.fromString("\n"), 20000)) 
     // Parse the chunks into JSON, and then to a Tweet. 
     // A better parsing strategy would be to account for all 
     // the different possible responses, but here we just 
     // collect those that match a Tweet. 
     .map(bytes => Try(Json.parse(bytes.toArray).validate[Tweet])) 
     .collect { 
     case Success(JsSuccess(tweet, _)) => tweet.text 
     } 
     // Print out each chunk 
     .runForeach(println).map { _ => 
     Ok("done") 
    } 
    } 
} 

注:兌現你需要一個隱含的Materializer注入到控制器的流。

+0

感謝您的解釋 – mkovacek

+0

以後可以關閉連接嗎?我打算有多個跟蹤不同單詞的請求,我希望在未來某個時間關閉特定連接? – mkovacek

+1

查看Akka文檔中的[Dynamic Stream Handling](http://doc.akka.io/docs/akka/2.4.8/scala/stream/stream-dynamic.html)。一個想法是:創建一個共享kill開關,然後用'source.via(killSwitch.flow)'將其添加到流中。在killswitch上運行'shutdown()'應該關閉連接。 – Mikesname

3

調用stream()給你回Future[StreamedResponse]。那麼您必須使用阿卡語成語來轉換ByteString區塊。是這樣的:

val stream = ws.url(url) 
    .sign(OAuthCalculator(consumerKey, requestToken)) 
    .withMethod("POST") 
    .stream() 

stream flatMap { res => 
    res.body.runWith(Sink.foreach[ByteString] { bytes => 
    println(bytes.utf8String) 
    }) 
} 

注意,我沒有測試上面的代碼(但它是基於關閉的https://www.playframework.com/documentation/2.5.x/ScalaWS流響應部分加上http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-flows-and-basics.html水槽描述的)

還指出,這將打印每大塊在自己的行,並且我不確定如果twitter API給每個塊回完整的json blob。如果要在打印塊之前累積塊,則可能需要使用Sink.fold