2017-08-26 65 views
0

我正在嘗試處理Spark中的常見異常,如.map操作在數據的所有元素或FileNotFound異常中無法正常工作。我已閱讀所有存在問題及以下兩個職位:如何處理Spark和Scala中的異常

https://rcardin.github.io/big-data/apache-spark/scala/programming/2016/09/25/try-again-apache-spark.html

https://www.nicolaferraro.me/2016/02/18/exception-handling-in-apache-spark

我已經試過行內一個try語句attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble
,所以它讀取attributes => Try(mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble)

但是,韓元不要編譯;編譯器稍後將不會識別.toDF()語句。我也嘗試了類似Java的Try {Catch {}}塊,但無法獲得正確的範圍;然後不返回df。有誰知道如何正確地做到這一點?我甚至需要處理這些異常,因爲Spark框架似乎已經處理了FileNotFound異常,而我卻沒有添加它。但是,例如,如果輸入文件的列數錯誤,我想用模式中的字段數生成一個錯誤。

下面的代碼:

object DataLoadTest extends SparkSessionWrapper { 
/** Helper function to create a DataFrame from a textfile, re-used in  subsequent tests */ 
def createDataFrame(fileName: String): DataFrame = { 

import spark.implicits._ 

//try { 
val df = spark.sparkContext 
    .textFile("/path/to/file" + fileName) 
    .map(_.split("\\t")) 
//mHealth user is the case class which defines the data schema 
    .map(attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble, 
     attributes(3).toDouble, attributes(4).toDouble, 
     attributes(5).toDouble, attributes(6).toDouble, attributes(7).toDouble, 
     attributes(8).toDouble, attributes(9).toDouble, attributes(10).toDouble, 
     attributes(11).toDouble, attributes(12).toDouble, attributes(13).toDouble, 
     attributes(14).toDouble, attributes(15).toDouble, attributes(16).toDouble, 
     attributes(17).toDouble, attributes(18).toDouble, attributes(19).toDouble, 
     attributes(20).toDouble, attributes(21).toDouble, attributes(22).toDouble, 
     attributes(23).toInt)) 
    .toDF() 
    .cache() 
df 
} catch { 
    case ex: FileNotFoundException => println(s"File $fileName not found") 
    case unknown: Exception => println(s"Unknown exception: $unknown") 

} 
} 

感謝所有的建議。謝謝!

回答

2

另一種選擇是使用Scala中Try類型。

例如:現在

def createDataFrame(fileName: String): Try[DataFrame] = { 

try { 
     //create dataframe df 
     Success(df) 
    } catch { 
     case ex: FileNotFoundException => { 
     println(s"File $fileName not found") 
     Failure(ex) 
     } 
     case unknown: Exception => { 
     println(s"Unknown exception: $unknown") 
     Failure(unknown) 
     } 
    } 
    } 

,在發送方,處理它喜歡:

createDataFrame("file1.csv") match { 
    case Success(df) => { 
    // proceed with your pipeline 
    } 
    case Failure(ex) => //handle exception 
} 

這比使用選項呼叫者會知道失敗的原因,並能處理略勝一籌更好。

1

要麼你讓異常被拋出createDataFrame法(外處理它),或更改簽名返回Option[DataFrame]

def createDataFrame(fileName: String): Option[DataFrame] = { 

    import spark.implicits._ 

    try { 
     val df = spark.sparkContext 
     .textFile("/path/to/file" + fileName) 
     .map(_.split("\\t")) 
     //mHealth user is the case class which defines the data schema 
     .map(attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble, 
     attributes(3).toDouble, attributes(4).toDouble, 
     attributes(5).toDouble, attributes(6).toDouble, attributes(7).toDouble, 
     attributes(8).toDouble, attributes(9).toDouble, attributes(10).toDouble, 
     attributes(11).toDouble, attributes(12).toDouble, attributes(13).toDouble, 
     attributes(14).toDouble, attributes(15).toDouble, attributes(16).toDouble, 
     attributes(17).toDouble, attributes(18).toDouble, attributes(19).toDouble, 
     attributes(20).toDouble, attributes(21).toDouble, attributes(22).toDouble, 
     attributes(23).toInt)) 
     .toDF() 
     .cache() 

     Some(df) 
    } catch { 
     case ex: FileNotFoundException => { 
     println(s"File $fileName not found") 
     None 
     } 
     case unknown: Exception => { 
     println(s"Unknown exception: $unknown") 
     None 
     } 
    } 
    } 

編輯:於createDataFrame的呼叫側有幾個圖案。如果你正在處理幾個文件名,你可以例如更多信息:

val dfs : Seq[DataFrame] = Seq("file1","file2","file3").map(createDataFrame).flatten 

如果你是在一個單一的文件名的工作,你可以這樣做:

createDataFrame("file1.csv") match { 
    case Some(df) => { 
    // proceed with your pipeline 
    val df2 = df.filter($"activityLabel" > 0).withColumn("binaryLabel", when($"activityLabel".between(1, 3), 0).otherwise(1)) 
    } 
    case None => println("could not create dataframe") 
} 
+0

@Raphael_Roth由於該工作,但然後打破了下一行,這是期待一個數據幀,而不是一個選項[數據幀]:'VAL DF2 = DF .filter($ 「activityLabel」> 0) .withColumn(「binaryLabel 「,when($」activityLabel「.between(1,3),0).otherwise(1))'我已經將df2更改爲Option [DataFrame],但是.filter現在不能編譯。 – LucieCBurgess

+0

@LucieCBurgess閱讀關於如何在scala中使用Option,或向我們展示代碼如何使用'createDataFrame',你是否遍歷文件名? –

+0

@Raphael_Roth:我試過這個:'val df:DataFrame = DataLoadTest.createDataFrame(fileName).getOrElse(None)'這是在一個單獨的類中,所以我不會將df重新分配給val。我得到錯誤:「類型Serializable的表達式不能確認預期的類型sql。DataFrame「 – LucieCBurgess