我正在SparkSQL上工作。我使用JavaPairRDD從HBase獲取數據,然後製作了一張地圖。在地圖中,我將所有的鑰匙保存到一個Set中。爲了強制完成這張地圖,我們遵循collect()。 在此之後,我使用Set中的值來執行其他操作。Spark懶惰轉換執行障礙
該程序可以在我的本地PC上完美工作。但是當我把它放到集羣(2名工人)時,就存在執行障礙。在地圖轉換之前,執行Set操作。
代碼流是這樣的: 從HBase的獲取數據:
JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = jsc.newAPIHadoopRDD(hbase_conf,
TableInputFormat.class, ImmutableBytesWritable.class,
Result.class);
變換數據:
JavaRDD<Map<String, String>> data = hBaseRDD.map(
new Function<Tuple2<ImmutableBytesWritable, Result>, Map<String, String>>(){
public Map<String, String> call(
Tuple2<ImmutableBytesWritable, Result> re)
throws Exception {
byte[] payload =re._2().getValue(Bytes.toBytes("ContentInfo"), Bytes.toBytes("Payload"));
Map<String, String> map = new ConcurrentHashMap<String, String>();
String primaryKey = new String(re._1().get());
map.put("primaryKey", primaryKey);
if(payload != null)
map.put("payload", new String(payload));
Map<byte[], byte[]> tmpMetaMap = re._2().getFamilyMap(Bytes.toBytes("MetaInfo"));
if(tmpMetaMap != null){
for(Entry<byte[], byte[]> entry : tmpMetaMap.entrySet()){
String tmpKey = Bytes.toString(entry.getKey());
String tmpValue = Bytes.toString(entry.getValue());
map.put(tmpKey, tmpValue);
//save result to the set
keySet.add(tmpKey);
}
}
return map;
}
});
力上述地圖來運行:
data.collect();
獲取結果套裝:
StringBuilder sb = new StringBuilder();
for(String fieldName: keySet){
sb.append(fieldName).append(",");
}
當我在本地運行的代碼,我可以得到所有的結果。但是當我在集羣上運行它時,sb沒有任何價值。
另一個問題:爲什麼在本地運行時可以設置keySet值? – user2965590 2014-12-03 16:08:46