2016-02-21 17 views
1

我正在RDD上進行一些模式匹配,並且只想選擇與模式匹配的重複行/記錄。以下是我目前有,如何在構建RDD時跳過與模式不匹配的條目

val idPattern = """Id="([^"]*)""".r.unanchored 
val typePattern = """PostTypeId="([^"]*)""".r.unanchored 
val datePattern = """CreationDate="([^"]*)""".r.unanchored 
val tagPattern = """Tags="([^"]*)""".r.unanchored 


val projectedPostsAnswers = postsAnswers.map {line => { 

     val id = line match {case idPattern(x) => x} 
     val typeId = line match {case typePattern(x) => x} 
     val date = line match {case datePattern(x) => x} 
     val tags = line match {case tagPattern(x) => x} 

     Post(Some(id),Some(typeId),Some(date),Some(tags)) 
    } 
} 

case class Post(Id: Option[String], Type: Option[String], CreationDate: Option[String], Tags: Option[String]) 

我只行/記錄興趣匹配所有的模式(即,具有所有這些四個字段的記錄)的。如何跳過那些不符合我的要求的行/記錄?

回答

2

您可以使用RDD.collect(scala.PartialFunction f)一步完成過濾和映射。

例如,如果你知道在你輸入這些字段的順序,可以合併正則表達式爲一體,並使用一個案例:

val pattern = """Id="([^"]*).*PostTypeId="([^"]*).*CreationDate="([^"]*).*Tags="([^"]*)""".r.unanchored 

val projectedPostsAnswers = postsAnswers.collect { 
    case pattern(id, typeId, date, tags) => Post(Some(id), Some(typeId), Some(date), Some(tags)) 
} 

返回RDD[Post]將只包含記錄此案件匹配。請注意,這個collect(PartialFunction)collect()沒有任何關係 - 它確實是而不是將整個數據集收集到驅動程序內存中。

1

使用過濾器API。

val projectedPostAnswers = postsAnswers.filter(line => f(line)).map{.... 

創建一個函數'f',爲您清理數據。確保函數返回true或false,就像過濾器用來決定是否傳遞記錄那樣。