2017-04-05 150 views
0

我正在使用Spark 2.0並嘗試使用wholeTextFiles API對文件進行流式處理。我的Spark程序正在成功讀取文件夾中的第一批文件,但我無法對後續批文件進行流式處理。使用wholeTextFiles進行Spark流式處理

請讓我知道如何在使用WholeTextFiles API時對文件進行流式處理。

這裏是我的代碼:

SparkConf sparkConf = new SparkConf().setAppName("My app") 
          .setMaster("local") 
          .set("spark.driver.allowMultipleContexts", "true"); 

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(15)); 

JavaPairRDD<String, String> WholeTextLocalFiles = jssc.sparkContext().wholeTextFiles("C:/Users/my/files/abcd7/simple/*.txt"); 

JavaRDD<String> stringRDD = wholeTextFiles.map(
    ----- 
    ---- 
    return mySchema;); 

SQLContext hc = new HiveContext(jssc.sparkContext()); 

Dataset<Row> df = hc.createDataFrame(schemaRDD, mySchema.class); 

df.createOrReplaceTempView("myView"); 

df.show();  
jssc.start(); 
jssc.awaitTermination(); 

Spark是處理數據的第一批。但沒有進一步批量..我沒有在這裏使用javaDStream,這可能會導致此錯誤。我如何從wholetextfiles api獲取javaDStream?

更新錯誤:

java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute 
    at scala.Predef$.require(Predef.scala:224) 
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163) 
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513) 
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573) 
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) 
    at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:554) 
    at com.comcast.emm.vodip.WholeTextLocal.WholeTextLocal.main(WholeTextLocal.java:225) 
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute 
    at scala.Predef$.require(Predef.scala:224) 
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163) 
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513) 
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573) 
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) 
    at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:554) 
+0

您沒有啓動流。無論你如何處理你的數據,你都應該啓動它,以便火花實際上開始讀取和處理數據。 – Mehraban

+0

當我添加jssc.start(); jssc.awaitTermination(); Spark正在處理第一批的數據。但沒有任何進一步的批次..更新問題與錯誤我越來越.. – AKC

回答

0

wholeTextFile將返回一個RDD。除非您對其執行某些操作,否則火花不會啓動任何操作。

錯誤「沒有輸出操作註冊,所以沒有執行」告訴你,你根本沒有使用流上下文。

檢查spark文檔中有關如何編寫流作業的示例。

+0

即使做了df.show(),作業不流。它只處理第一批,而不是流式作業。您能否就此提供任何指導.. – AKC

0

根據spark docs,在處理數據流時,應避免將主設置爲locallocal[1],因爲它不會留下任何內核來處理數據。

When running a Spark Streaming program locally, do not use 「local」 or 「local[1]」 as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using an input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use 「local[n]」 as the master URL, where n > number of receivers to run (see Spark Properties for information on how to set the master).

相關問題