我正在使用Spark 2.0並嘗試使用wholeTextFiles API對文件進行流式處理。我的Spark程序正在成功讀取文件夾中的第一批文件,但我無法對後續批文件進行流式處理。使用wholeTextFiles進行Spark流式處理
請讓我知道如何在使用WholeTextFiles API時對文件進行流式處理。
這裏是我的代碼:
SparkConf sparkConf = new SparkConf().setAppName("My app")
.setMaster("local")
.set("spark.driver.allowMultipleContexts", "true");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(15));
JavaPairRDD<String, String> WholeTextLocalFiles = jssc.sparkContext().wholeTextFiles("C:/Users/my/files/abcd7/simple/*.txt");
JavaRDD<String> stringRDD = wholeTextFiles.map(
-----
----
return mySchema;);
SQLContext hc = new HiveContext(jssc.sparkContext());
Dataset<Row> df = hc.createDataFrame(schemaRDD, mySchema.class);
df.createOrReplaceTempView("myView");
df.show();
jssc.start();
jssc.awaitTermination();
Spark是處理數據的第一批。但沒有進一步批量..我沒有在這裏使用javaDStream,這可能會導致此錯誤。我如何從wholetextfiles api獲取javaDStream?
更新錯誤:
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:554)
at com.comcast.emm.vodip.WholeTextLocal.WholeTextLocal.main(WholeTextLocal.java:225)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:554)
您沒有啓動流。無論你如何處理你的數據,你都應該啓動它,以便火花實際上開始讀取和處理數據。 – Mehraban
當我添加jssc.start(); jssc.awaitTermination(); Spark正在處理第一批的數據。但沒有任何進一步的批次..更新問題與錯誤我越來越.. – AKC