2016-02-18 166 views
3

我有火花流在Pyspark與「批量間隔」 = 30秒火花流

ssc = StreamingContext(sc, 30) 

然後我想用窗口()函數用於獲取數據的最後一小時,並把每個30秒橫跨這個數據。

kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) 

counts = kvs.map(lambda (k, v): json.loads(v))\ 
      .map(TransformInData).window(108000) 

,我已經得到了一個錯誤

16/02/18 10:23:01 INFO JobScheduler: Added jobs for time 1455790980000 ms 
16/02/18 10:23:30 INFO PythonTransformedDStream: Slicing from 1455683040000 ms to 1455791010000 ms (aligned to 1455683040000 ms and 1455791010000 ms) 
16/02/18 10:23:30 INFO PythonTransformedDStream: Time 1455790650000 ms is invalid as zeroTime is 1455790650000 ms and slideDuration is 30000 ms and difference is 0 ms 
16/02/18 10:23:31 INFO JobScheduler: Added jobs for time 1455791010000 ms 

我已閱讀本https://groups.google.com/forum/#!topic/spark-users/GQoxJHAAtX4 ,但我不明白爲什麼它不工作

+1

星火哪個版本是您使用添加offset? – sgvd

+0

1.6.0版本spark –

+0

同樣問題:我認爲它是拋出「時間無效..」作爲'timeTime-ZeroTIme'(差異)isTimeValid()'方法是Dstream類的0,這意味着'時間=零時間'因此'time <= zeroTime'返回true,導致'isTimeValid'方法返回false返回上面的消息。 –

回答

1

我有同樣的問題。升級到Spark 2.0.1修復了它。

0

是請升級到2.1星火然後 在ms添加kafka frequency = 1000,並在您Dstream()