您好我是新來的Python Spark和我嘗試從星火github上這個例子中,爲了在給定的目錄中創建新的文本文件計數的話:Pyspark - FileInputDStream:錯誤尋找新的文件
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: hdfs_wordcount.py <directory>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingHDFSWordCount")
ssc = StreamingContext(sc, 1)
lines = ssc.textFileStream("hdfs:///home/my-logs/")
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda x: (x, 1))\
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
而這就是我得到: a warning saying : WARN FileInputDStream: Error finding new files
一個警告消息說:WARN FileInputDStream: Error finding new files
。
,我得到空的結果,即使我在這個目錄中添加文件:/
這個任何建議的解決方案? 謝謝。
嗨@abaghel,這只是一個錯誤,而打字(我更新了我的問題),我使用完全相同的例子,但我總是空的結果,你認爲我應該** spark-2.xx-bin-hadoop2.7 **而不是將它從* github *中拉出來? –
您可以先嚐試從https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/hdfs_wordcount.py運行示例,而不是在程序中設置自定義目錄路徑。流處理的結果將在其他終端快速滾動,因此只要在/ tmp目錄中添加文件,就必須立即監視它。 – abaghel
它適用於我現在只改變一些構建參數:D,其他問題,你認爲我可以做一些事情,即使在文件更改(附加在相同的日誌)時,也可以產生流動? –