2013-05-30 125 views
11

我正在玩Spark。這是來自網站的默認配置,默認配置,集羣模式,一個工作者(我的本地主機)。我閱讀關於安裝的文檔,一切似乎都很好。火花集羣在更大的輸入上失敗,適用於小型

我有一個CSV文件(各種大小,1000 - 100萬行)。如果我用小輸入文件(例如1000行)運行我的應用程序,一切都很好,程序在幾秒鐘內完成併產生預期的輸出。 但是,當我提供一個更大的文件(100.000行,或100萬),執行失敗。我試圖挖掘日誌,但沒有多大幫助(它重複了整個過程大約9-10次,之後以失敗退出,而且還有一些錯誤與從某個null源獲取失敗相關)。

結果第一個JavaRDD返回的Iterable對我來說是可疑的。如果我返回一個硬編碼的單例列表(如res.add(「something」); return res;),即使有一百萬行,一切都很好。但是,如果我添加了我想要的所有密鑰(28個字符長度爲6-20個字符),則該過程將失敗只有與大的輸入。 問題是,我需要所有這些鍵,這是實際的業務邏輯。

我使用Linux amd64,四核,8GB內存。最新的Oracle Java7 JDK。星火配置:

SPARK_WORKER_MEMORY=4g 
SPARK_MEM=3g 
SPARK_CLASSPATH=$SPARK_CLASSPATH:/my/super/application.jar 

不得不提的是,當我啓動程序,它說:

13/05/30 11:41:52 WARN spark.Utils: Your hostname, *** resolves to a loopback address: 127.0.1.1; using 192.168.1.157 instead (on interface eth1) 
13/05/30 11:41:52 WARN spark.Utils: Set SPARK_LOCAL_IP if you need to bind to another address 

這是我的計劃。它基於最小修改的JavaWordCount示例。

public final class JavaWordCount 
{ 
    public static void main(final String[] args) throws Exception 
    { 
     final JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount", 
      System.getenv("SPARK_HOME"), new String[] {"....jar" }); 

     final JavaRDD<String> words = ctx.textFile(args[1], 1).flatMap(new FlatMapFunction<String, String>() { 

      @Override 
      public Iterable<String> call(final String s) 
      { 
       // parsing "s" as the line, computation, building res (it's a List<String>) 
       return res; 
      } 
     }); 

     final JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() { 

      @Override 
      public Tuple2<String, Integer> call(final String s) 
      { 
       return new Tuple2<String, Integer>(s, 1); 
      } 
     }); 
     final JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() { 

      @Override 
      public Integer call(final Integer i1, final Integer i2) 
      { 
       return i1 + i2; 
      } 
     }); 

     counts.collect(); 

     for (Tuple2<?, ?> tuple : counts.collect()) { 
      System.out.println(tuple._1 + ": " + tuple._2); 
     } 
    } 
} 
+0

改變之前Spark系統屬性,你的工作失敗時會發生什麼異常/錯誤? –

+0

在spark-users組中,我得到了答案:.collect()將觸發每個(temp)RDD的集合。這是真正的問題。線程與解決方案在這裏:http://stackoverflow.com/questions/16832429/spark-cluster-fails-on-bigger-input-works-well-for-small?noredirect=1#comment24468201_16832429 – gyorgyabraham

+1

我GOOGLE了年齡和年齡嘗試爲了解決我的問題,這個問題的答案解決了我的問題,所以請編輯您的問題,在您的問題中包含「org.apache.spark.SparkException:與MapOutputTracker進行通信時出錯」,以便將來爲其他人提供更容易的Google搜索。 – samthebest

回答

13

我設法通過設置spark.mesos.coarse到真正的財產,以解決它。更多信息here

更新:我一直在玩Spark幾個小時。這些設置對我有點幫助,但似乎幾乎不可能在一臺機器上處理大約1000萬行文字。

System.setProperty("spark.serializer", "spark.KryoSerializer"); // kryo is much faster 
System.setProperty("spark.kryoserializer.buffer.mb", "256"); // I serialize bigger objects 
System.setProperty("spark.mesos.coarse", "true"); // link provided 
System.setProperty("spark.akka.frameSize", "500"); // workers should be able to send bigger messages 
System.setProperty("spark.akka.askTimeout", "30"); // high CPU/IO load 

注意:增加幀大小預防就顯得尤爲有用:org.apache.spark.SparkException: Error communicating with MapOutputTracker

+1

'spark.akka.frameSize'也解決了我的'org.apache.spark.SparkException:錯誤與MapOutputTracker'通信問題。 – samthebest

+0

是否系統。setProperty()也適用於spark-shell?我無法獲取frameSize集 –