我有一個9臺計算機的集羣,其上安裝了Apache Hadoop 2.7.2和Spark 2.0.0。每臺計算機運行一個HDFS datanode和Spark從站。其中一臺計算機還運行HDFS名稱節點和Spark主節點。如何讓Spark從服務器在Hadoop + Spark集羣中使用HDFS輸入文件「本地」?
我已經在複製= 2的HDFS中上傳了幾個TB的gz-archives。事實證明,一些檔案是腐敗的。我想找到他們。它看起來像'gunzip -t'可以提供幫助。所以我試圖找到一種方法來在集羣上運行Spark應用程序,以便每個Spark執行程序只要它測試歸檔文件的「本地」(即,其中一個副本位於執行程序運行的同一臺計算機上)是可能的。以下腳本運行,但有時Spark執行程序在HDFS中處理「遠程」文件:
// Usage (after packaging a jar with mainClass set to 'com.qbeats.cortex.CommoncrawlArchivesTester' in spark.pom
// and placing this jar file into Spark's home directory):
// ./bin/spark-submit --master spark://LV-WS10.lviv:7077 spark-cortex-fat.jar spark://LV-WS10.lviv:7077 hdfs://LV-WS10.lviv:9000/commoncrawl 9
// means testing for corruption the gz-archives in the directory hdfs://LV-WS10.lviv:9000/commoncrawl
// using a Spark cluster with the Spark master URL spark://LV-WS10.lviv:7077 and 9 Spark slaves
package com.qbeats.cortex
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.FileSplit
import org.apache.spark.rdd.HadoopRDD
import org.apache.spark.{SparkContext, SparkConf, AccumulatorParam}
import sys.process._
object CommoncrawlArchivesTester extends App {
object LogAccumulator extends AccumulatorParam[String] {
def zero(initialValue: String): String = ""
def addInPlace(log1: String, log2: String) = if (log1.isEmpty) log2 else log1 + "\n" + log2
}
override def main(args: Array[String]): Unit = {
if (args.length >= 3) {
val appName = "CommoncrawlArchivesTester"
val conf = new SparkConf().setAppName(appName).setMaster(args(0))
conf.set("spark.executor.memory", "6g")
conf.set("spark.shuffle.service.enabled", "true")
conf.set("spark.dynamicAllocation.enabled", "true")
conf.set("spark.dynamicAllocation.initialExecutors", args(2))
val sc = new SparkContext(conf)
val log = sc.accumulator(LogAccumulator.zero(""))(LogAccumulator)
val text = sc.hadoopFile(args(1), classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
val hadoopRdd = text.asInstanceOf[HadoopRDD[LongWritable, Text]]
val fileAndLine = hadoopRdd.mapPartitionsWithInputSplit { (inputSplit, iterator) =>
val fileName = inputSplit.asInstanceOf[FileSplit].getPath.toString
class FilePath extends Iterable[String] {
def iterator = List(fileName).iterator
}
val result = (sys.env("HADOOP_PREFIX") + "/bin/hadoop fs -cat " + fileName) #| "gunzip -t" !
println("Processed %s.".format(fileName))
if (result != 0) {
log.add(fileName)
println("Corrupt: %s.".format(fileName))
}
(new FilePath).iterator
}
val result = fileAndLine.collect()
println("Corrupted files:")
println(log.value)
}
}
}
您會提出什麼建議?
後來補充:
我嘗試另一個腳本,它通過文本文件從HDFS得到文件()。我看起來像一個Spark執行者不喜歡輸入文件中的「本地」文件。這與「Spark將代碼添加到數據中,而不是數據添加到代碼中」不矛盾嗎?
// Usage (after packaging a jar with mainClass set to 'com.qbeats.cortex.CommoncrawlArchiveLinesCounter' in spark.pom)
// ./bin/spark-submit --master spark://LV-WS10.lviv:7077 spark-cortex-fat.jar spark://LV-WS10.lviv:7077 hdfs://LV-WS10.lviv:9000/commoncrawl 9
package com.qbeats.cortex
import org.apache.spark.{SparkContext, SparkConf}
object CommoncrawlArchiveLinesCounter extends App {
override def main(args: Array[String]): Unit = {
if (args.length >= 3) {
val appName = "CommoncrawlArchiveLinesCounter"
val conf = new SparkConf().setAppName(appName).setMaster(args(0))
conf.set("spark.executor.memory", "6g")
conf.set("spark.shuffle.service.enabled", "true")
conf.set("spark.dynamicAllocation.enabled", "true")
conf.set("spark.dynamicAllocation.initialExecutors", args(2))
val sc = new SparkContext(conf)
val helper = new Helper
val nLines = sc.
textFile(args(1) + "/*").
mapPartitionsWithIndex((index, it) => {
println("Processing partition %s".format(index))
it
}).
count
println(nLines)
}
}
}
SAIF C,請您詳細解釋一下嗎?
星火恐怕無法看到該文件的本地版本。您可能需要在spark提交中使用'--files'選項來添加它們。當使用這個時,本地文件可以通過執行者根目錄的spark來訪問。 –