我在使用Spark流編程時遇到了一些麻煩。因爲我想創建一個輸入流並使用自定義輸入格式來讀取它們。該定義是這樣的:JavaStreamingContext.fileStream的Java實現
def fileStream[K, V, F <: NewInputFormat[K, V]](
directory: String): JavaPairInputDStream[K, V] = {
implicit val cmk: ClassTag[K] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
implicit val cmv: ClassTag[V] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
implicit val cmf: ClassTag[F] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[F]]
ssc.fileStream[K, V, F](directory)
}
如果我使用Scala的,那麼我將如下寫我的代碼:
val lines = ssc.fileStream[LongWritable, Text, TextInputFormat](dataDirectory)
但是,當我用java這樣的:
ClassTag<LongWritable> k = scala.reflect.ClassTag$.MODULE$.apply(LongWritable.class);
ClassTag<Text> v = scala.reflect.ClassTag$.MODULE$.apply(Text.class);
ClassTag<InputFormat<LongWritable, Text>> f = scala.reflect.ClassTag$.MODULE$.apply(TextInputFormat.class);
JavaPairInputDStream<LongWritable, Text> inputLines = ssc.fileStream<k, v, f>("dataDirectory);
我會遇到「fileStream無法解析或不是字段」的錯誤。 那麼,如何使用JavaStreamingContext.fileStream?
我創建了SSC與下面的代碼:
JavaStreamingContext ssc = new JavaStreamingContext(new SparkConf().setAppName("Spark Streaming Demo"), new Duration(3000));
謝謝!
當我正在尋找在互聯網這個問題,我發現有人寫了如下的代碼: 'JavaPairInputDStream inputLines = SSC FILESTREAM(DataDirectory目錄) ;' 所以,我試了一下,但根本不工作。 一個有趣的事情是[cgrothaus](https://github.com/cgrothaus/spark/blob/master/streaming/src/test/java/spark/streaming/JavaAPISuite.java)提供** testFileStream **函數然而,[apache](https://github.com/apache/spark/blob/master/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java)不會-_- –
stupig