0

我正在將多個html文件讀入Spark中的數據框。 我使用一個自定義轉換的HTML元素列在數據幀UDF在Apache Spark中爲每行迭代添加作用域變量

val dataset = spark 
    .sparkContext 
    .wholeTextFiles(inputPath) 
    .toDF("filepath", "filecontent") 
    .withColumn("biz_name", parseDocValue(".biz-page-title")('filecontent)) 
    .withColumn("biz_website", parseDocValue(".biz-website a")('filecontent)) 

    ... 

    def parseDocValue(cssSelectorQuery: String) = 
    udf((html: String) => Jsoup.parse(html).select(cssSelectorQuery).text()) 

這完美的作品,但每個withColumn調用將導致HTML字符串,這是多餘的解析。

有沒有辦法(不使用查找表或這樣),我可以根據每行的「filecontent」列生成1解析文檔(Jsoup.parse(html)),並認爲可用於在數據幀的所有withColumn電話?

或者我不應該嘗試使用DataFrames,只是使用RDD的?

+0

您可以使用示例文本字符串更新嗎? –

+0

我在本質上存在「整個文本文件」的非並行化問題(例如,在64個核心羣集上有2個執行文件,甚至在重新分區之前),所以我可能會重寫整個文件。當我解決這個問題時,我會更新並看看這些建議。對不起, –

+0

你有沒有得到它解決或其他? –

回答

0

所以最終的答案實際上很簡單:

只需在行上映射並在其中創建對象

def docValue(cssSelectorQuery: String, attr: Option[String] = None)(implicit document: Document): Option[String] = { 
    val domObject = document.select(cssSelectorQuery) 

    val domValue = attr match { 
     case Some(a) => domObject.attr(a) 
     case None => domObject.text() 
    } 

    domValue match { 
     case x if x == null || x.isEmpty => None 
     case y => Some(y) 
    } 
    } 

val dataset = spark 
     .sparkContext 
     .wholeTextFiles(inputPath, minPartitions = 265) 
     .map { 
     case (filepath, filecontent) => { 
      implicit val document = Jsoup.parse(filecontent) 

      val customDataJson = docJson(filecontent, customJsonRegex) 


      DataEntry(
      biz_name = docValue(".biz-page-title"), 
      biz_website = docValue(".biz-website a"), 
      url = docValue("meta[property=og:url]", attr = Some("content")), 
      ... 
      filename = Some(fileName(filepath)), 
      fileTimestamp = Some(fileTimestamp(filepath)) 
     ) 
     } 
     } 
     .toDS() 
0

我可能會改寫如下,做分析和選擇一氣呵成,並把它們放到一個臨時列:

val dataset = spark 
    .sparkContext 
    .wholeTextFiles(inputPath) 
    .withColumn("temp", parseDocValue(Array(".biz-page-title", ".biz-website a"))('filecontent)) 
    .withColumn("biz_name", col("temp")(0)) 
    .withColumn("biz_website", col("temp")(1)) 
    .drop("temp") 

def parseDocValue(cssSelectorQueries: Array[String]) = 
udf((html: String) => { 
    val j = Jsoup.parse(html) 
    cssSelectorQueries.map(query => j.select(query).text())}) 
相關問題