2014-12-27 45 views
0

我在使用Spark流編程時遇到了一些麻煩。因爲我想創建一個輸入流並使用自定義輸入格式來讀取它們。該定義是這樣的:JavaStreamingContext.fileStream的Java實現

def fileStream[K, V, F <: NewInputFormat[K, V]](
     directory: String): JavaPairInputDStream[K, V] = { 
    implicit val cmk: ClassTag[K] = 
     implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] 
    implicit val cmv: ClassTag[V] = 
     implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] 
    implicit val cmf: ClassTag[F] = 
     implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[F]] 
    ssc.fileStream[K, V, F](directory) 
} 

如果我使用Scala的,那麼我將如下寫我的代碼:

val lines = ssc.fileStream[LongWritable, Text, TextInputFormat](dataDirectory) 

但是,當我用java這樣的:

ClassTag<LongWritable> k = scala.reflect.ClassTag$.MODULE$.apply(LongWritable.class); 
ClassTag<Text> v = scala.reflect.ClassTag$.MODULE$.apply(Text.class); 
ClassTag<InputFormat<LongWritable, Text>> f = scala.reflect.ClassTag$.MODULE$.apply(TextInputFormat.class); 
JavaPairInputDStream<LongWritable, Text> inputLines = ssc.fileStream<k, v, f>("dataDirectory); 

我會遇到「fileStream無法解析或不是字段」的錯誤。 那麼,如何使用JavaStreamingContext.fileStream?

我創建了SSC與下面的代碼:

JavaStreamingContext ssc = new JavaStreamingContext(new SparkConf().setAppName("Spark Streaming Demo"), new Duration(3000)); 

謝謝!

+0

當我正在尋找在互聯網這個問題,我發現有人寫了如下的代碼: 'JavaPairInputDStream inputLines = SSC FILESTREAM(DataDirectory目錄) ;' 所以,我試了一下,但根本不工作。 一個有趣的事情是[cgrothaus](https://github.com/cgrothaus/spark/blob/master/streaming/src/test/java/spark/streaming/JavaAPISuite.java)提供** testFileStream **函數然而,[apache](https://github.com/apache/spark/blob/master/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java)不會-_- – stupig

回答

0

FILESTREAM不能得到解決或無法在現場問題通過使用FILESTREAM不當造成的。當使用如下FILESTREAM:

JavaPairInputDStream<LongWritable, Text> inputLines = ssc.<LongWritable, Text, TestInputFormat>fileStream("dataDirectory); 

和TestInputFormat必須延伸OUTPUTFORMAT

public interface TestOutputFormat extends OutputFormat<LongWritable, Text> 

有以這種方式使用的時候沒有問題,但你必須實現TestInputFormat的使用舊的API類(ORG .apache.hadoop.mapred。*)。我沒有試過這個。^_^

0

您需要添加

import java.io.File; 

import java.io.*; 
+0

I已經在我的代碼中導入了java.io.File。 – stupig