2016-08-17 35 views
1

我有一個9臺計算機的集羣,其上安裝了Apache Hadoop 2.7.2和Spark 2.0.0。每臺計算機運行一個HDFS datanode和Spark從站。其中一臺計算機還運行HDFS名稱節點和Spark主節點。如何讓Spark從服務器在Hadoop + Spark集羣中使用HDFS輸入文件「本地」?

我已經在複製= 2的HDFS中上傳了幾個TB的gz-archives。事實證明,一些檔案是腐敗的。我想找到他們。它看起來像'gunzip -t'可以提供幫助。所以我試圖找到一種方法來在集羣上運行Spark應用程序,以便每個Spark執行程序只要它測試歸檔文件的「本地」(即,其中一個副本位於執行程序運行的同一臺計算機上)是可能的。以下腳本運行,但有時Spark執行程序在HDFS中處理「遠程」文件:

// Usage (after packaging a jar with mainClass set to 'com.qbeats.cortex.CommoncrawlArchivesTester' in spark.pom 
// and placing this jar file into Spark's home directory): 
// ./bin/spark-submit --master spark://LV-WS10.lviv:7077 spark-cortex-fat.jar spark://LV-WS10.lviv:7077 hdfs://LV-WS10.lviv:9000/commoncrawl 9 
// means testing for corruption the gz-archives in the directory hdfs://LV-WS10.lviv:9000/commoncrawl 
// using a Spark cluster with the Spark master URL spark://LV-WS10.lviv:7077 and 9 Spark slaves 

package com.qbeats.cortex 

import org.apache.hadoop.mapred.TextInputFormat 
import org.apache.hadoop.io.{LongWritable, Text} 
import org.apache.hadoop.mapred.FileSplit 
import org.apache.spark.rdd.HadoopRDD 
import org.apache.spark.{SparkContext, SparkConf, AccumulatorParam} 
import sys.process._ 

object CommoncrawlArchivesTester extends App { 
    object LogAccumulator extends AccumulatorParam[String] { 
    def zero(initialValue: String): String = "" 
    def addInPlace(log1: String, log2: String) = if (log1.isEmpty) log2 else log1 + "\n" + log2 
    } 

    override def main(args: Array[String]): Unit = { 
    if (args.length >= 3) { 
     val appName = "CommoncrawlArchivesTester" 
     val conf = new SparkConf().setAppName(appName).setMaster(args(0)) 
     conf.set("spark.executor.memory", "6g") 
     conf.set("spark.shuffle.service.enabled", "true") 
     conf.set("spark.dynamicAllocation.enabled", "true") 
     conf.set("spark.dynamicAllocation.initialExecutors", args(2)) 
     val sc = new SparkContext(conf) 

     val log = sc.accumulator(LogAccumulator.zero(""))(LogAccumulator) 

     val text = sc.hadoopFile(args(1), classOf[TextInputFormat], classOf[LongWritable], classOf[Text]) 
     val hadoopRdd = text.asInstanceOf[HadoopRDD[LongWritable, Text]] 

     val fileAndLine = hadoopRdd.mapPartitionsWithInputSplit { (inputSplit, iterator) => 
     val fileName = inputSplit.asInstanceOf[FileSplit].getPath.toString 
     class FilePath extends Iterable[String] { 
      def iterator = List(fileName).iterator 
     } 
     val result = (sys.env("HADOOP_PREFIX") + "/bin/hadoop fs -cat " + fileName) #| "gunzip -t" ! 

     println("Processed %s.".format(fileName)) 
     if (result != 0) { 
      log.add(fileName) 
      println("Corrupt: %s.".format(fileName)) 
     } 
     (new FilePath).iterator 
     } 

     val result = fileAndLine.collect() 

     println("Corrupted files:") 
     println(log.value) 
    } 
    } 
} 

您會提出什麼建議?


後來補充:

我嘗試另一個腳本,它通過文本文件從HDFS得到文件()。我看起來像一個Spark執行者不喜歡輸入文件中的「本地」文件。這與「Spark將代碼添加到數據中,而不是數據添加到代碼中」不矛盾嗎?

// Usage (after packaging a jar with mainClass set to 'com.qbeats.cortex.CommoncrawlArchiveLinesCounter' in spark.pom) 
// ./bin/spark-submit --master spark://LV-WS10.lviv:7077 spark-cortex-fat.jar spark://LV-WS10.lviv:7077 hdfs://LV-WS10.lviv:9000/commoncrawl 9 

package com.qbeats.cortex 

import org.apache.spark.{SparkContext, SparkConf} 

object CommoncrawlArchiveLinesCounter extends App { 
    override def main(args: Array[String]): Unit = { 
    if (args.length >= 3) { 
     val appName = "CommoncrawlArchiveLinesCounter" 
     val conf = new SparkConf().setAppName(appName).setMaster(args(0)) 
     conf.set("spark.executor.memory", "6g") 
     conf.set("spark.shuffle.service.enabled", "true") 
     conf.set("spark.dynamicAllocation.enabled", "true") 
     conf.set("spark.dynamicAllocation.initialExecutors", args(2)) 
     val sc = new SparkContext(conf) 

     val helper = new Helper 
     val nLines = sc. 
     textFile(args(1) + "/*"). 
     mapPartitionsWithIndex((index, it) => { 
      println("Processing partition %s".format(index)) 
      it 
     }). 
     count 
     println(nLines) 
    } 
    } 
} 

SAIF C,請您詳細解釋一下嗎?

+0

星火恐怕無法看到該文件的本地版本。您可能需要在spark提交中使用'--files'選項來添加它們。當使用這個時,本地文件可以通過執行者根目錄的spark來訪問。 –

回答

相關問題