2016-12-27 77 views
0

您好我是新來的Python Spark和我嘗試從星火github上這個例子中,爲了在給定的目錄中創建新的文本文件計數的話:Pyspark - FileInputDStream:錯誤尋找新的文件

import sys 

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 

if __name__ == "__main__": 
if len(sys.argv) != 2: 
    print("Usage: hdfs_wordcount.py <directory>", file=sys.stderr) 
    exit(-1) 

sc = SparkContext(appName="PythonStreamingHDFSWordCount") 
ssc = StreamingContext(sc, 1) 

lines = ssc.textFileStream("hdfs:///home/my-logs/") 
counts = lines.flatMap(lambda line: line.split(" "))\ 
       .map(lambda x: (x, 1))\ 
       .reduceByKey(lambda a, b: a+b) 
counts.pprint() 

ssc.start() 
ssc.awaitTermination() 

而這就是我得到: a warning saying : WARN FileInputDStream: Error finding new files

一個警告消息說:WARN FileInputDStream: Error finding new files

,我得到空的結果,即使我在這個目錄中添加文件:/

這個任何建議的解決方案? 謝謝。

回答

0

我想你是指this的例子。你是否能夠在不修改的情況下運行它,因爲我看到你在程序中將目錄設置爲「hdfs:///」?你可以像下面那樣運行這個例子。

例如Spark在/opt/spark-2.0.2-bin-hadoop2.7。您可以運行hdfs_wordcount.py在下面的示例目錄中可用。我們使用/tmp作爲目錄作爲參數傳遞給程序。現在

[email protected]:/opt/spark-2.0.2-bin-hadoop2.7$ bin/spark-submit examples/src/main/python/streaming/hdfs_wordcount.py /tmp 

這個程序運行時,打開另一個終端和一些文件複製到文件夾/tmp

[email protected]:~$ cp test.txt /tmp 

你會看到在第一終端的字數。

+0

嗨@abaghel,這只是一個錯誤,而打字(我更新了我的問題),我使用完全相同的例子,但我總是空的結果,你認爲我應該** spark-2.xx-bin-hadoop2.7 **而不是將它從* github *中拉出來? –

+0

您可以先嚐試從https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/hdfs_wordcount.py運行示例,而不是在程序中設置自定義目錄路徑。流處理的結果將在其他終端快速滾動,因此只要在/ tmp目錄中添加文件,就必須立即監視它。 – abaghel

+0

它適用於我現在只改變一些構建參數:D,其他問題,你認爲我可以做一些事情,即使在文件更改(附加在相同的日誌)時,也可以產生流動? –

0

解決!

的問題是身材,我用它來構建等,這取決於他們的自述文件從github上是使用maven:

build/mvn -DskipTests clean package

我已經構建方式取決於他們documentation

build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package

有人知道這些參數是什麼?