0
我試圖使用Apache Flink獲取Twitter流API的一些消息。Apache Flink - 無法從Twitter獲取數據
但是,我的代碼沒有在輸出文件中寫入任何內容。我正在計算特定單詞的輸入數據。
普萊舍檢查我的例子:
import java.util.Properties
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.twitter._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import com.twitter.hbc.core.endpoint.{Location, StatusesFilterEndpoint, StreamingEndpoint}
import org.apache.flink.streaming.api.windowing.time.Time
import scala.collection.JavaConverters._
//////////////////////////////////////////////////////
// Create an Endpoint to Track our terms
class myFilterEndpoint extends TwitterSource.EndpointInitializer with Serializable {
@Override
def createEndpoint(): StreamingEndpoint = {
//val chicago = new Location(new Location.Coordinate(-86.0, 41.0), new Location.Coordinate(-87.0, 42.0))
val endpoint = new StatusesFilterEndpoint()
//endpoint.locations(List(chicago).asJava)
endpoint.trackTerms(List("odebrecht", "lava", "jato").asJava)
endpoint
}
}
object Connection {
def main(args: Array[String]): Unit = {
val props = new Properties()
val params: ParameterTool = ParameterTool.fromArgs(args)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setGlobalJobParameters(params)
env.setParallelism(params.getInt("parallelism", 1))
props.setProperty(TwitterSource.CONSUMER_KEY, params.get("consumer-key"))
props.setProperty(TwitterSource.CONSUMER_SECRET, params.get("consumer-key"))
props.setProperty(TwitterSource.TOKEN, params.get("token"))
props.setProperty(TwitterSource.TOKEN_SECRET, params.get("token-secret"))
val source = new TwitterSource(props)
val epInit = new myFilterEndpoint()
source.setCustomEndpointInitializer(epInit)
val streamSource = env.addSource(source)
streamSource.map(s => (0, 1))
.keyBy(0)
.timeWindow(Time.minutes(2), Time.seconds(30))
.sum(1)
.map(t => t._2)
.writeAsText(params.get("output"))
env.execute("Twitter Count")
}
}
的一點是,我沒有錯誤消息,我可以在我的儀表盤看到的。我的源是發送數據到我的TriggerWindow。但它沒有收到任何數據:
我有兩個問題在一次。
第一:爲什麼我的源發送字節到我的TriggerWindow如果沒有收到任何東西?
Seccond:我的代碼有些問題,我無法從twitter獲取數據?
第一次結果應該在2分鐘後寫出(即窗口的長度)。你等了那麼久嗎? TriggerWindow已經收到了數據,但是在43s之後,肯定不會有任何東西寫入文件。你的代碼看起來不錯。 –
嗨@DawidWysakowicz,是的,我等了那麼久。隨便我運行這個代碼2個小時。我爲這個問題拍了照片。但是Flink沒有輸出:( –