2015-06-28 22 views
1

我有一個RDD:與JavaPairRDD實現的Hadoop Map作爲星火路

JavaPairRDD<Long, ViewRecord> myRDD 

它通過newAPIHadoopRDD方法創建的。我有我想要實現它在星火路一個已經存在的地圖功能:

LongWritable one = new LongWritable(1L); 

protected void map(Long key, ViewRecord viewRecord, Context context) 
    throws IOException ,InterruptedException { 

    String url = viewRecord.getUrl(); 
    long day = viewRecord.getDay(); 

    tuple.getKey().set(url); 
    tuple.getValue().set(day); 

    context.write(tuple, one); 
}; 

PS:元組來源於:

KeyValueWritable<Text, LongWritable> 

,並可以在這裏找到:TextLong.java

回答

2

我不知道什麼元組是,但如果你只是想將記錄映射到關鍵字(url, day)和值1L的元組,你可以這樣做:

result = myRDD 
    .values() 
    .mapToPair(viewRecord -> { 
     String url = viewRecord.getUrl(); 
     long day = viewRecord.getDay(); 
     return new Tuple2<>(new Tuple2<>(url, day), 1L); 
    }) 


//java 7 style 
JavaPairRDD<Pair, Long> result = myRDD 
     .values() 
     .mapToPair(new PairFunction<ViewRecord, Pair, Long>() { 
         @Override 
         public Tuple2<Pair, Long> call(ViewRecord record) throws Exception { 
          String url = record.getUrl(); 
          Long day = record.getDay(); 

          return new Tuple2<>(new Pair(url, day), 1L); 
         } 
        } 
     ); 
+0

我在我的問題中添加了原始元組類。由於lambda表達式在當前語言級別不受支持,您能否提供Java 7樣式的示例? – kamaci

+0

@kamaci我添加了Java 7版本。 – abalcerek

+0

我已經實現它: JavaRDD > result = myRDD.values()。map( new Function >(){ @Override 公共Tuple2 <字符串,龍>呼叫(viewRecord viewRecord)拋出異常{ 字符串URL = viewRecord.getUrl(); \t整天= viewRecord.getDay();返回 新Tuple2 <>(URL,日); } }); 我找不到你提到的Pair和Record類。我的函數編譯但它會拋出一個錯誤:'SparkException:任務不可序列化',但它似乎是另一個問題。 – kamaci