2015-05-24 113 views
3

我已經從HBase的轉化的RDD:火花RDD發現通過鍵

VAL hbaseRDD:RDD [(字符串,數組[字符串])]其中tuple._1是rowkey。而數組是HBase中的值。

4929101-ACTIVE, ["4929101","2015-05-20 10:02:44","dummy1","dummy2"] 
4929102-ACTIVE, ["4929102","2015-05-20 10:02:44","dummy1","dummy2"] 
4929103-ACTIVE, ["4929103","2015-05-20 10:02:44","dummy1","dummy2"] 

我也有一個SchemaRDD(ID,DATE1,COL1,COL2,COL3)轉化爲

VAL refDataRDD:RDD [(字符串,陣列[字符串])]爲此,我將遍歷和檢查它是否存在於hbaseRDD:,

4929103, ["2015-05-21 10:03:44","EV01","col2","col3"] 
4929104, ["2015-05-21 10:03:44","EV02","col2","col3"] 

問題是

  • 如何檢查是否有鍵(tuple._1)/( 「4929103」)是否存在於hbaseRDD中並獲取相應的值(tuple._2)? - 我不能使用rdd.filter內PairRDD的查找功能,它會拋出「scala.MatchError:空」,但它的作品外

    val filteredRDD = rdd.filter(sqlRow => { 
        val hbaseLookup = hbaseRDD.lookup(sqlRow(0).toString + "-ACTIVE") 
        // if found, check if date1 of hbaseRDD < sqlRow(1) 
        // else if not found, retain row 
        true 
    }) 
    

    我不知道如果是這樣的問題,不過,因爲我也經歷了NPE,當我查找線路切換到:

    val sqlRowHbase = hbaseRDD.filter(row => { 
    

    注:這些行之前,我做了hbaseRDD.count。和hbaseRDD.lookup的rdd.filter

外面工作正常所以基本上,我試圖通過hbaseRDD鍵「發現」,並獲得該行/值。加入它們有點複雜,因爲兩個RDD中的某些值可能爲空。這取決於很多情況下哪些數據會保留哪些行。

回答

0

假設您需要查找的一組a_id包含在一個RDD中,我認爲您可以使用leftOuterJoin而不是迭代並查找每個值。

我在上面看到了關於date1潛在可變位置的評論。雖然我沒有在下面解決它,但我認爲這應該在查找本身之前通過某種特定的每行映射來處理。

如果我正確得到僞代碼,您的RDD爲(id, date),並且希望通過在hbase中查找數據來更新它,並更新日期,如果在hbase中爲該id找到一行,並且它的日期早於在refData中的一個。那是對的嗎?

如果是這樣,假設你有一些像這樣的裁判數據:

val refData = sc.parallelize(Array(
("4929103","2015-05-21 10:03:44"), 
("4929104","2015-05-21 10:03:44") 
)) 

而且從HBase的一些行數據:

val hbaseRDD = sc.parallelize(Array(
    ("4929101-ACTIVE", Array("4929101","2015-05-20 10:02:44")), 
    ("4929102-ACTIVE", Array("4929102","2015-05-20 10:02:44")), 
    ("4929103-ACTIVE", Array("4929103","2015-05-20 10:02:44")) 
)) 

然後,你可以從refData做到每個ID的查找到HBase的與一個簡單的leftOuterJoin,並找到每一行:更新日期如有必要:

refData 
    // looks up in Hbase all rows whose date1 a_id value matches the id in searchedIds 
    .leftOuterJoin(hbaseRDD.map{ case (rowkey, Array(a_id, date1)) => (a_id, date1)}) 

    // update the date in refData if date from hBase is earlier 
    .map { case (rowKey, (refDate, maybeRowDate)) => (rowKey, chooseDate (refDate, maybeRowDate)) } 
    .collect 


def chooseDate(refDate: String, rowDate: Option[String]) = rowDate match { 

    // if row not found in Hbase: keep ref date 
    case None => refDate 

    case Some(rDate) => 
    if (true) /* replace this by first parsing the date, then check if rowDate < refDate */ 
     rowDate 
    else 
     refDate 
} 
+0

嗨,我不是要更新r dd.date1,我試圖通過比較rdd和hbaseRDD的值來過濾rdd。我更新了帖子以澄清事情。 – sophie

+0

感謝您的澄清。 我認爲leftOuterJoin仍然是要走的路,它將比迭代rdd和在另一箇中查找值的計算花費少得多。 在如上定義的leftOuterJoin之後,基本上你在refData中每行有一個結果行,其中包含來自RefData的數據以及來自Hbase的數據(如果找到)。基於此,我相信您應該能夠根據日期編寫您描述的過濾/標記邏輯。 – Svend