2014-12-02 74 views
1

我正在SparkSQL上工作。我使用JavaPairRDD從HBase獲取數據,然後製作了一張地圖。在地圖中,我將所有的鑰匙保存到一個Set中。爲了強制完成這張地圖,我們遵循collect()。 在此之後,我使用Set中的值來執行其他操作。Spark懶惰轉換執行障礙

該程序可以在我的本地PC上完美工作。但是當我把它放到集羣(2名工人)時,就存在執行障礙。在地圖轉換之前,執行Set操作。

代碼流是這樣的: 從HBase的獲取數據:

JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD =  jsc.newAPIHadoopRDD(hbase_conf, 
       TableInputFormat.class, ImmutableBytesWritable.class, 
       Result.class); 

變換數據:

JavaRDD<Map<String, String>> data = hBaseRDD.map(
       new Function<Tuple2<ImmutableBytesWritable, Result>, Map<String, String>>(){ 
        public Map<String, String> call(
          Tuple2<ImmutableBytesWritable, Result> re) 
          throws Exception { 
         byte[] payload =re._2().getValue(Bytes.toBytes("ContentInfo"), Bytes.toBytes("Payload")); 
         Map<String, String> map = new ConcurrentHashMap<String, String>(); 

         String primaryKey = new String(re._1().get()); 
         map.put("primaryKey", primaryKey); 

         if(payload != null) 
          map.put("payload", new String(payload)); 

         Map<byte[], byte[]> tmpMetaMap = re._2().getFamilyMap(Bytes.toBytes("MetaInfo")); 
         if(tmpMetaMap != null){ 
          for(Entry<byte[], byte[]> entry : tmpMetaMap.entrySet()){ 

           String tmpKey = Bytes.toString(entry.getKey()); 
           String tmpValue = Bytes.toString(entry.getValue()); 

           map.put(tmpKey, tmpValue); 
    //save result to the set 
           keySet.add(tmpKey); 
          } 
         } 
         return map; 
        } 
       }); 

力上述地圖來運行:

data.collect(); 

獲取結果套裝:

StringBuilder sb = new StringBuilder(); 

     for(String fieldName: keySet){ 

      sb.append(fieldName).append(","); 
     } 

當我在本地運行的代碼,我可以得到所有的結果。但是當我在集羣上運行它時,sb沒有任何價值。

回答

0

這個問題是不相關的操作的順序,而是其中集羣中的這種行動正在發生。

火花,這裏有兩種類型的操作:變革與行動。

轉換轉換和RDD到另一個RDD通過應用一些功能的內容。這是一種純粹的功能性方法,無副作用。 動作採用RDD併產生其他內容,如文件或本地數據結構:這些操作將RDD的數據實現爲其他形式。

在這種情況下,轉換函數:map正在使用帶有副作用,因爲keyset預計會在映射轉換期間發生變異。 鑑於keyset在轉換函數的範圍之外定義,它會被序列化和發送到執行者,但任何突變發生遠程將不會在驅動程序恢復。

如果我們仔細想想,遺囑執行人將申請對數據的分區改造,使任何內容`鍵集」結尾,將只有每個分區的局部視圖。

模型正確的方法是重新定義在RDD變革和行動方面的操作。

從上面的代碼看來,我們想要將某些輸入轉換爲RDD[Map[String,String]],並且我們有興趣從驅動程序中收集所有不是「主鍵」和「有效負載」條目的鍵集合結果。

火花,這可能是這樣的:

// data = RDD[Map[String, String]] 
// first we get all the keys from all the maps 
val keys = data.map{entry => entry.keys} 
// now we collect that information on the driver 
val allKeys = keys.collect 
// we transform the resulting array into a set - this will remove duplicates by definition 
val allKeySet = allKeys.toSet 
// We need still to remove "primaryKey" and "payload" 
val keySet = fullKeySet.diff(Set("primaryKey","payload")) 

在Java代碼更詳細一點,但結構和思路是一致的。

+0

另一個問題:爲什麼在本地運行時可以設置keySet值? – user2965590 2014-12-03 16:08:46

0

您是如何定義鍵集的?嘗試將它定義爲靜態或以其它方式使用foreach代替map這將對所有的數據到這個DriverSide.Hope回答你的問題

+0

是的,我將keySet定義爲最終的靜態HashSet。 我也在hBaseRDD之後嘗試了一個簡單的foreach。只是一個習慣而沒有遵循。它在火花服務器上也不起作用。 – user2965590 2014-12-03 00:37:39