2015-05-20 38 views
0

我正在嘗試使用Spark RDD做一些文本處理。Spark - 如何正確處理RDD.map()方法中的錯誤情況?

輸入文件的格式是:

2015-05-20T18:30 <some_url>/?<key1>=<value1>&<key2>=<value2>&...&<keyn>=<valuen> 

我想提取文本中的某些字段,並將其轉換成CSV格式,如:

<value1>,<value5>,<valuek>,<valuen> 

下面的代碼是我該怎麼辦這個:

val lines = sc.textFile(s"s3n://${MY_BUCKET}/${MY_FOLDER}/test/*.gz") 
val records = lines.map { line => 
    val mp = line.split("&") 
       .map(_.split("=")) 
       .filter(_.length >= 2) 
       .map(t => (t(0), t(1))).toMap 

    (mp.get("key1"), mp.get("key5"), mp.get("keyk"), mp.get("keyn")) 
} 

我想知道,如果輸入文本的某一行是錯誤的格式或我無效,則map()函數無法返回有效值。這在文本處理中很常見,處理這個問題的最佳做法是什麼?

回答

9

爲了管理這個錯誤,你可以使用Scala的類flatMap操作中嘗試,在代碼:

val lines = sc.textFile(s"s3n://${MY_BUCKET}/${MY_FOLDER}/test/*.gz") 
    val records = lines.flatMap (line => 
     Try{ 
      val mp = line.split("&") 
       .map(_.split("=")) 
       .filter(_.length >= 2) 
       .map(t => (t(0), t(1))).toMap 

      (mp.get("key1"), mp.get("key5"), mp.get("keyk"), mp.get("keyn")) 
     } match { 
     case Success(map) => Seq(map) 
     case _ => Seq() 
    }) 

有了這個你只有「好的」,但如果你想兩者(錯誤和好的),我會建議使用返回斯卡拉無論是地圖功能,然後使用星火過濾器,代碼:

val lines = sc.textFile(s"s3n://${MY_BUCKET}/${MY_FOLDER}/test/*.gz") 
    val goodBadRecords = lines.map (line => 
     Try{ 
      val mp = line.split("&") 
       .map(_.split("=")) 
       .filter(_.length >= 2) 
       .map(t => (t(0), t(1))).toMap 

      (mp.get("key1"), mp.get("key5"), mp.get("keyk"), mp.get("keyn")) 
     } match { 
     case Success(map) => Right(map) 
     case Failure(e) => Left(e) 
    }) 
    val records = goodBadRecords.filter(_.isRight) 
    val errors = goodBadRecords.filter(_.isLeft) 

我希望這將是有益的

+0

對不起,你爲什麼要重新映射一試變成一個嗎?這種情況下的語義完全一樣。 –

相關問題