2016-10-13 80 views
1

我試圖合併多個JavaRDD,但我只得到2合併可以有人親切的幫助。我一直在努力,但總體而言,我希望能夠獲得多個集合並使用sqlContext創建一個組並打印出所有結果。合併多個JavaRDD

這裏我的代碼

JavaRDD<AppLog> logs = mapCollection(sc, "mongodb://hadoopUser:[email protected]:27017/hbdata.ppa_logs").union(
           mapCollection(sc, "mongodb://hadoopUser:[email protected]:27017/hbdata.fav_logs").union(
           mapCollection(sc, "mongodb://hadoopUser:[email protected]:27017/hbdata.pps_logs").union(
            mapCollection(sc, "mongodb://hadoopUser:[email protected]:27017/hbdata.dd_logs").union(
            mapCollection(sc, "mongodb://hadoopUser:[email protected]:27017/hbdata.ppt_logs") 
           ) 
           ) 
          ) 
         ); 


public JavaRDD<AppLog> mapCollection(JavaSparkContext sc ,String uri){ 

    Configuration mongodbConfig = new Configuration(); 
    mongodbConfig.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat"); 
    mongodbConfig.set("mongo.input.uri", uri); 

    JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD(
     mongodbConfig,   // Configuration 
     MongoInputFormat.class, // InputFormat: read from a live cluster. 
     Object.class,    // Key class 
     BSONObject.class   // Value class 
    ); 

    return documents.map(

     new Function<Tuple2<Object, BSONObject>, AppLog>() { 

      public AppLog call(final Tuple2<Object, BSONObject> tuple) { 
       AppLog log = new AppLog(); 
       BSONObject header = 
       (BSONObject) tuple._2(); 

       log.setTarget((String) header.get("target")); 
       log.setAction((String) header.get("action")); 

       return log; 
      } 
     } 
    ); 
} 

//打印收藏 SQLContext sqlContext =新org.apache.spark.sql.SQLContext(SC);

DataFrame logsSchema = sqlContext.createDataFrame(logs, AppLog.class); 
    logsSchema.registerTempTable("logs"); 

    DataFrame groupedMessages = sqlContext.sql(
     "select * from logs"); 
     // "select target, action, Count(*) from logs group by target, action"); 

     // "SELECT to, body FROM messages WHERE to = \"[email protected]\""); 



    groupedMessages.show(); 

    logsSchema.printSchema(); 
+0

您如何認識到只有兩個RDD是統一的? 第二個問題:爲什麼你要以遞歸方式調用聯合(我知道它不是遞歸執行的),而不是函數式的寫作風格?我的意思是rdd1.union(rdd2).union(rdd3)等等。聯盟的返回類型應該是rdd。在你的寫作風格 - > mapCollection(STH1,STH2).union(mapCollection(STH1,STH2))等 – hasan

+0

嗨,我使用的sqlcontext打印RDD並且只顯示了兩個運行我的代碼後RDD。我已經更新了上面的問題,以便您瞭解如何打印數據。我使用遞歸樣式代碼作爲ive嘗試了很多方法,但它沒有工作,這是最接近的一個來。任何建議? –

+0

對我來說這似乎是正確的。唯一讓我困惑的是遞歸的寫作風格。也許你可以加載每個日誌RDD和後來做的加入和打印計數或每個RDD的第一要素,看看他們是不是空的。功能寫作風格將導致更好的可讀性,只是建議 – hasan

回答

2

如果您想合併多個JavaRDDs,只需使用sc.union(rdd1,rdd2,..)代替rdd1.union(rdd2).union(rdd3)

另請選擇此項RDD.union vs SparkContex.union

+0

感謝您的幫助。它看起來像我仍然只得到了前兩個RDD加入也不太清楚爲什麼 –

+0

即使使用sc.uniom後(RDD1集,RDD2,...) –

+0

有人請打我。我注意到我打印出我的值的底部,它說「只顯示前20行」。過去6-7個小時我一直在爲此工作 –