2017-02-18 17 views
0

我正在嘗試將hadoop代碼遷移到spark中。我已經有了一些預定義的函數,我應該可以在spark中重用,因爲它們僅僅是java代碼,沒有太多的hadoop依賴性。我有一個函數接受文本格式的輸入(空間數據 - 經度,緯度)並將它們轉換爲形狀(多邊形,線流等)。當我嘗試在Spark中讀取它時,我首先以String的形式讀取每行文件。然後將它們轉換爲文本,以便我可以使用我以前創建的函數。但是我有兩個疑問,首先看起來JavaRDD沒有使用文本,並且我正在收到一些問題。其次,將文本轉換爲形狀的功能不會返回任何內容。但我無法使用flatMap或任何其他映射技術。我甚至不確定我的方法是否正確。在JavaRDD中使用文本數據類型並在FlatMap中返回void

這裏是我的代碼模型:

/*function for converting Text to Shape*/ 
public interface TextSerializable { 
public Text toText(Text text); 
public void fromText(Text text); 
* Retrieve information from the given text. 
* @param text The text to parse 
*/ 
} 



/*Shape Class looks something like this*/ 

public interface Shape extends Writable, Cloneable, TextSerializable { 
/
* Returns minimum bounding rectangle for this shape. 
* @return The minimum bounding rectangle for this shape 
*/ 
public Rectangle getMBR(); 

/** 
* Gets the distance of this shape to the given point. 
* @param x The x-coordinate of the point to compute the distance to 
* @param y The y-coordinate of the point to compute the distance to 
* @return The Euclidean distance between this object and the given point 
*/ 
...... 
...... 
......*/ 

/*My code structure*/ 

SparkConf conf = new SparkConf().setAppName("XYZ").setMaster("local"); 
JavaSparkContext sc =new JavaSparkContext(conf); 

final Text text=new Text(); 

JavaRDD<String> lines = sc.textFile("ABC.csv"); 

lines.foreach(new VoidFunction<String>(){ 
public void call(String lines){ 
     text.set(lines); 
     System.out.println(text); 
    } 
    }); 

/*Problem*/ 
text.flatMap(new FlatMapFunction<Text>(){ 
    public Iterable<Shape> call(Shape s){ 
     s.fromText(text); 
     //return void; 
    } 

代碼的最後一行是錯誤的,但我不知道如何解決它。 JavaRDD可以與用戶定義的類一起使用(根據我的知識)。我甚至不確定我是否已經將字符串行轉換爲文本文本(如果RDD中允許的話)。我在Spark中是全新的。任何形式的幫助都會很棒。

回答

0

你完全脫離了這個概念。首先,您不能在任何對象上調用像map,flatmap等函數,只能從JavaRDD調用它們,而Text不是JavaRDD和Spark do支持文本,而不是您使用它的方式。

現在來到你的問題,因爲你想將字符串轉換爲文本格式,使用這樣的事情

SparkConf conf = new SparkConf().setAppName("Name of Application"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 
    JavaRDD<String> logData = sc.textFile("replace with address of file"); 

/*This map function will take string as input because we are calling it on javaRDD logData and that logData return string type value. This map fucntion will give Text as output 
you can replace the return statement with logic of your toText function(However new Text(s) is also a way to convert string into Text) but remember use of return is mandatory so apply logic accordingly 
     */ 
     JavaRDD<Text> rddone = logData.map(new Function<String,Text>(){ 
      public Text call(String s) 
      {// type logic of your toText() function here 
      return new Text(s);}}); 

現在,當我們調用flatmap功能在JavaRDD rddone將需要輸入的文本,因爲rddone的輸出是文本,它可以給你任何你想要的輸出。

/* This flatmap fucntion will take Text as input and will give iterator over object */ 
    JavaRDD <Object> empty = rddone.flatMap(new FlatMapFunction<Text,Object>(){ 
      public Iterator<Object> call(Text te) 
      { 
       // here you can call your fromText(te) method. 
       return null; 
     } 
     }); 

也請參閱有關更多詳細信息http://spark.apache.org/docs/latest/programming-guide.html

http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaRDD.html

+0

非常感謝這些鏈接。我有一個想法如何去做。 – SGh

相關問題