2013-07-22 53 views
10

在官方火花文檔工​​作,對於其在foreach調用中使用的蓄電池直接是一個RDD一個例子:蓄能器出現故障,本地

scala> val accum = sc.accumulator(0) 
accum: spark.Accumulator[Int] = 0 

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) 
... 
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s 

scala> accum.value 
res2: Int = 10 

我實現我自己的累加器:

val myCounter = sc.accumulator(0) 

val myRDD = sc.textFile(inputpath) // :spark.RDD[String] 

myRDD.flatMap(line => foo(line)) // line 69 

def foo(line: String) = { 
    myCounter += 1 // line 82 throwing NullPointerException 
    // compute something on the input 
} 
println(myCounter.value) 

在本地設置,這工作得很好。但是,如果我運行一個獨立的火花集羣上這份工作有幾臺機器,工人在這遞增累加器myCounter行拋出

13/07/22 21:56:09 ERROR executor.Executor: Exception in task ID 247 
java.lang.NullPointerException 
    at MyClass$.foo(MyClass.scala:82) 
    at MyClass$$anonfun$2.apply(MyClass.scala:67) 
    at MyClass$$anonfun$2.apply(MyClass.scala:67) 
    at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:440) 
    at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400) 
    at spark.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:630) 
    at spark.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:640) 
    at spark.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:640) 
    at spark.scheduler.ResultTask.run(ResultTask.scala:77) 
    at spark.executor.Executor$TaskRunner.run(Executor.scala:98) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:722) 

我的問題是:累加器只能用於直接應用於RDD而不是嵌套函數的「頂級」匿名函數嗎? 如果是,爲什麼我的調用在本地成功並在羣集上失敗?

編輯:增加了異常的冗長度。

+0

你能發佈更多的工作人員的追蹤? –

+0

你有沒有試過'sc.broadcast(myCounter)'? – Noah

+0

「廣播」是否返回只讀值?從[官方API文檔](http://spark-project.org/docs/latest/api/core/index.html#spark.SparkContext):「將一個只讀變量廣播到集羣,返回一個Broadcast對象用於在分佈式函數中讀取它,變量只會發送到每個集羣一次。「 – ptikobj

回答

1

如果你這樣定義函數:

def foo(line: String, myc: org.apache.spark.Accumulator[Int]) = { 
    myc += 1 
} 

然後調用它像這樣:

foo(line, myCounter) 

+0

這似乎是正確的,您可以將您創建的累加器傳遞給方法 – pulasthi

-1

如果您使用「flatMap」,那麼「myCounter」將不會更新,因爲「flatMap」是lazzy函數。您可以使用此代碼:

myRDD.foreach(line => foo(line)) 
def foo(line: String) = {myCounter +=1} 
println(myCounter.value) 
2

在我的情況也是如此,蓄能器在關閉空當我用「擴展應用」中產生火花的應用程序如下圖所示

object AccTest extends App { 


    val conf = new SparkConf().setAppName("AccTest").setMaster("yarn-client") 
    val sc = new SparkContext(conf) 
    sc.setLogLevel("ERROR") 

    val accum = sc.accumulator(0, "My Accumulator") 
    sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) 

    println("count:" + accum.value) 

    sc.stop 
    } 
} 

我換成擴展應用程序與main()方法和它的工作在YARN集羣中的HDP 2.4 對象AccTest { DEF主(參數:數組[字符串]):單位= {

val conf = new SparkConf().setAppName("AccTest").setMaster("yarn-client") 
val sc = new SparkContext(conf) 
sc.setLogLevel("ERROR") 

val accum = sc.accumulator(0, "My Accumulator") 
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) 

println("count:" + accum.value) 

sc.stop 

} }